This is an automated email from the ASF dual-hosted git repository.

cdeppisch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-k.git


The following commit(s) were added to refs/heads/main by this push:
     new d2605bdd1 fix: Improve Knative trigger filter
d2605bdd1 is described below

commit d2605bdd153ab89ae594029bbcb6b9112a2146a0
Author: Christoph Deppisch <cdeppi...@redhat.com>
AuthorDate: Thu Jun 6 10:26:04 2024 +0200

    fix: Improve Knative trigger filter
    
    - Fixes #5537: Support filter attributes other than event type (e.g. 
source, subject, extensions)
    - Fixes #5529: Allows empty filter to consume the full event stream
    - Fixes #5446: Knative Trigger creation is only based on event type 
attribute
    - Fixes #5577: Consistently support "cloudEventsType" property in Pipes 
source/sink
    - Update documentation and improve Knative Kamelet/Pipe user guide
---
 .github/actions/e2e-knative-yaks/action.yml        |   2 +-
 .github/actions/kamel-cleanup/cleanup.sh           |   2 +-
 .github/actions/kamel-install-yaks/action.yml      |   2 +-
 .github/workflows/knative.yml                      |   2 +-
 .../modules/ROOT/pages/kamelets/kamelets-user.adoc | 185 +++++-
 .../modules/ROOT/pages/running/camel-runtimes.adoc |   4 +-
 docs/modules/ROOT/partials/apis/camel-k-crds.adoc  |  18 +
 docs/modules/traits/pages/knative.adoc             |  12 +
 e2e/knative/kamelet_test.go                        |   6 +-
 e2e/support/util/dump.go                           |  38 ++
 .../{yaks-config.yaml => event-source-pipe.yaml}   |  35 +-
 .../common/knative-broker/knative-pipe.feature     |  34 +
 .../{yaks-config.yaml => log-sink-pipe.yaml}       |  29 +-
 .../{yaks-config.yaml => log-sink.kamelet.yaml}    |  37 +-
 .../{yaks-config.yaml => no-filter-pipe.yaml}      |  27 +-
 .../{yaks-config.yaml => source-filter-pipe.yaml}  |  29 +-
 ...{yaks-config.yaml => timer-source.kamelet.yaml} |  46 +-
 e2e/yaks/common/knative-broker/yaks-config.yaml    |  37 +-
 .../sinkbinding-http.feature                       |   2 +-
 helm/camel-k/crds/crd-integration-platform.yaml    |  34 +
 helm/camel-k/crds/crd-integration-profile.yaml     |  34 +
 helm/camel-k/crds/crd-integration.yaml             |  17 +
 helm/camel-k/crds/crd-kamelet-binding.yaml         |  17 +
 helm/camel-k/crds/crd-pipe.yaml                    |  17 +
 pkg/apis/camel/v1/trait/knative.go                 |   8 +
 pkg/apis/camel/v1/trait/zz_generated.deepcopy.go   |  10 +
 .../camel.apache.org_integrationplatforms.yaml     |  34 +
 .../camel.apache.org_integrationprofiles.yaml      |  34 +
 .../crd/bases/camel.apache.org_integrations.yaml   |  17 +
 .../bases/camel.apache.org_kameletbindings.yaml    |  17 +
 .../config/crd/bases/camel.apache.org_pipes.yaml   |  17 +
 pkg/trait/jvm_test.go                              |   2 +-
 pkg/trait/knative.go                               |  63 +-
 pkg/trait/knative_test.go                          | 714 ++++++++++++++++++++-
 pkg/util/bindings/bindings_test.go                 |   6 +
 pkg/util/bindings/knative_ref.go                   |  71 +-
 pkg/util/bindings/knative_ref_test.go              | 182 +++++-
 pkg/util/knative/knative.go                        |  46 +-
 pkg/util/knative/uri.go                            |   2 +-
 script/Makefile                                    |   2 +-
 40 files changed, 1728 insertions(+), 163 deletions(-)

diff --git a/.github/actions/e2e-knative-yaks/action.yml 
b/.github/actions/e2e-knative-yaks/action.yml
index 83becbcad..50281fe48 100644
--- a/.github/actions/e2e-knative-yaks/action.yml
+++ b/.github/actions/e2e-knative-yaks/action.yml
@@ -106,7 +106,7 @@ runs:
     uses: ./.github/actions/kamel-install-yaks
     with:
       image-name: "docker.io/citrusframework/yaks"
-      version: 0.14.3
+      version: 0.19.1
 
   - id: report-problematic
     name: List Tests Marked As Problematic
diff --git a/.github/actions/kamel-cleanup/cleanup.sh 
b/.github/actions/kamel-cleanup/cleanup.sh
index f2a9c62e5..d38479423 100755
--- a/.github/actions/kamel-cleanup/cleanup.sh
+++ b/.github/actions/kamel-cleanup/cleanup.sh
@@ -103,6 +103,6 @@ fi
 kubectl get crds | grep camel | awk '{print $1}' | xargs kubectl delete crd &> 
/dev/null
 
 #
-# Remove KNative resources
+# Remove Knative resources
 #
 ./.github/actions/kamel-cleanup/cleanup-knative.sh
diff --git a/.github/actions/kamel-install-yaks/action.yml 
b/.github/actions/kamel-install-yaks/action.yml
index 641411511..a6babcea3 100644
--- a/.github/actions/kamel-install-yaks/action.yml
+++ b/.github/actions/kamel-install-yaks/action.yml
@@ -21,7 +21,7 @@ description: 'Install YAKS artifacts'
 inputs:
   version:
     description: "The YAKS version"
-    default: 0.14.3
+    default: 0.19.1
     required: false
   image-name:
     description: "The YAKS operator image name"
diff --git a/.github/workflows/knative.yml b/.github/workflows/knative.yml
index 5900fbff7..f70f203e6 100644
--- a/.github/workflows/knative.yml
+++ b/.github/workflows/knative.yml
@@ -88,7 +88,7 @@ jobs:
           -q "${{ github.event.inputs.log-level }}" \
           -t "${{ github.event.inputs.test-filters }}"
 
-    - name: KNative Tests
+    - name: Knative Tests
       uses: ./.github/actions/e2e-knative
       with:
         cluster-config-data: ${{ secrets.E2E_CLUSTER_CONFIG }}
diff --git a/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc 
b/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc
index 6011b4877..ae27db023 100644
--- a/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc
+++ b/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc
@@ -219,27 +219,123 @@ You can run this integration without specifying other 
parameters, the Kamelet en
 automatically mount the secret into the integration Pod.
 
 [[kamelets-usage-binding]]
-== Binding Kamelets
+== Binding Kamelets in Pipes
 
 In some contexts (for example **"serverless"**) users often want to leverage 
the power of Apache Camel to be able to connect to various sources/sinks, 
without
-doing additional processing (such as tranformations or other enterprise 
integration patterns).
+doing additional processing (such as transformations or other enterprise 
integration patterns).
 
-A common use case is that of **Knative Sources**, for which the Apache Camel 
developers maintain the 
https://knative.dev/docs/eventing/samples/apache-camel-source/[Knative 
CamelSources].
-Kamelets represent an **evolution** of the model proposed in CamelSources, but 
they allow using the same declarative style of binding, via a resource named 
**Pipe**.
+A common use case is that of **Knative Event Sources**, for which the Apache 
Camel developers provide the concept of Kamelets and Pipes.
+Kamelets represent an **evolution** of the Camel route templates to provide an 
opinionated and easy to use connector to various components and services.
+The Kamelets allow using a declarative style of binding sources and sinks 
where data produced by a source, transformed in the form of actions steps and 
pushed to a given sink, via a resource named **Pipe**.
 
-=== Binding to a Knative Destination
+=== Binding to Knative
 
-A Pipe allows to declaratively move data from a system described by a Kamelet 
towards a Knative destination (or other kind of destinations, in the future), 
or from
-a Knative channel/broker to another external system described by a Kamelet.
+A Pipe allows to move data from a system described by a Kamelet towards a 
Knative destination, or from a Knative channel/broker to another external 
system described by a Kamelet.
+This means Pipes may act as event sources and sinks for the Knative eventing 
broker in a declarative way.
 
-For example, here's an example of binding:
+For example, here is a Pipe that connects a Kamelet Telegram source to the 
Knative broker:
 
 [source,yaml]
 ----
 apiVersion: camel.apache.org/v1
 kind: Pipe
 metadata:
-  name: telegram-text-source-to-channel
+  name: telegram-to-knative
+spec:
+  source: # <1>
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1
+      name: telegram-text-source
+    properties:
+      botToken: the-token-here
+  sink: # <2>
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1
+      name: default
+----
+<1> Reference to the source that provides data
+<2> Reference to the sink where data should be sent to
+
+This binding takes the `telegram-text-source` Kamelet, configures it using 
specific properties ("botToken") and
+makes sure that messages produced by the Kamelet are forwarded to the Knative 
**Broker** named "default".
+
+Note that source and sink are specified as standard **Kubernetes object 
references** in a declarative way.
+
+Knative eventing uses the CloudEvents data format by default.
+You may want to set some properties that specify the event attributes such as 
the event type.
+
+[source,yaml]
+----
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: telegram-to-knative
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1
+      name: telegram-text-source
+    properties:
+      botToken: the-token-here
+  sink:
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1
+      name: default
+    properties:
+      type: org.apache.camel.telegram.events # <1>
+----
+<1> Sets the event type attribute of the CloudEvent produced by this Pipe
+
+This way you may specify event attributes before publishing to the Knative 
broker.
+Note that Camel uses a default CloudEvents event type `org.apache.camel.event` 
for events produced by Camel.
+
+You can overwrite CloudEvent event attributes on the sink using the 
`ce.overwrite.` prefix when setting a property.
+
+[source,yaml]
+----
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: telegram-to-knative
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1
+      name: telegram-text-source
+    properties:
+      botToken: the-token-here
+  sink:
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1
+      name: default
+    properties:
+      type: org.apache.camel.telegram.events
+      ce.overwrite.ce-source: my-source # <1>
+----
+<1> Use "ce.overwrite.ce-source" to explicitly set the CloudEvents source 
attribute.
+
+The example shows how we can reference the "telegram-text-source" resource in 
a Pipe.
+It's contained in the `source` section because it's a Kamelet of type "source".
+A Kamelet of type "sink", by contrast, can only be used in the `sink` section 
of a `Pipe`.
+
+**Under the covers, a Pipe creates an Integration** resource that implements 
the binding, but all details of how to connect with
+Telegram forwarding the data to the Knative broker is fully transparent to the 
end user. For instance the Integration uses a `SinkBinding` concept
+under the covers in order to retrieve the Knative broker endpoint URL.
+
+In the same way you can also connect a Kamelet source to a Knative channel.
+
+[source,yaml]
+----
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: telegram-to-knative-channel
 spec:
   source: # <1>
     ref:
@@ -255,18 +351,75 @@ spec:
       name: messages
 ----
 <1> Reference to the source that provides data
+<2> Reference to the Knative channel that acts as the sink where data should 
be sent to
+
+When reading data from Knative you just need to specify for instance the 
Knative broker as a source in the Pipe.
+Events consumed from Knative event stream will be pushed to the given sink of 
the Pipe.
+
+[source,yaml]
+----
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: knative-to-slack
+spec:
+  source: # <1>
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1
+      name: default
+    properties:
+      type: org.apache.camel.event.messages
+  sink: # <2>
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1
+      name: slack-sink
+    properties:
+      channel: "#my-channel"
+      webhookUrl: the-webhook-url
+----
+<1> Reference to the Knative broker source that provides data
 <2> Reference to the sink where data should be sent to
 
-This binding takes the `telegram-text-source` Kamelet, configures it using 
specific properties ("botToken") and
-makes sure that messages produced by the Kamelet are forwarded to the Knative 
**InMemoryChannel** named "messages".
+Once again, the Pipe provides a declarative way of creating event sources and 
sinks for Knative eventing.
+In the example, all events of type `org.apache.camel.event.messages` get 
forwarded to the given Slack channel using the Webhook API.
 
-Note that source and sink are specified declaratively as standard **Kubernetes 
object references**.
+When consuming events from the Knative broker you most likely need to filter 
and select the events to process.
+You can do that with the properties set on the Knative broker source 
reference, for instance filtering by the even type as shown in the example.
+The filter possibilities include CloudEvent attributes such as event type, 
source, subject and extensions.
 
-The example shows how we can reference the "telegram-text-source" resource in 
a Pipe. It's contained in the `source` section
-because it's a Kamelet of type "source".
-A Kamelet of type "sink", by contrast, can only be used in the `sink` section 
of a `Pipe`.
+In the background Camel K will automatically create a Knative Trigger resource 
for the Pipe that uses the filter attributes accordingly.
+
+.Sample trigger created by Camel K
+[source,yaml]
+----
+apiVersion: eventing.knative.dev/v1
+kind: Trigger
+metadata:
+  name: camel-event-messages
+spec:
+  broker: default # <1>
+  filter:
+    attributes:
+      type: org.apache.camel.event.messages
+      myextension: my-extension-value
+  subscriber:
+    ref:
+      apiVersion: serving.knative.dev/v1 # <2>
+      kind: Service
+      name: camel-service
+    uri: /events/camel.event.messages
+----
+<1> Reference to the Knative broker source that provides data
+<2> Reference to the Camel K integration/pipe service
+
+The trigger calls the Camel K integration service endpoint URL and pushes 
events with the given filter attributes to the Pipe.
+All properties that you have set on the Knative broker source reference will 
be set as a filter attribute on the trigger resource (except for reserved 
properties such as `name` and `cloudEventsType`).
 
-**Under the covers, a Pipe creates an Integration** resource that implements 
the binding, but this is transparent to the end user.
+Note that Camel K creates the trigger resource only for Knative broker type 
event sources.
+In case you reference a Knative channel as a source in a Pipe Camel K assumes 
that the channel and the trigger are already present.
+Camel K will only create the subscription for the integration service on the 
channel.
 
 === Binding to a Kafka Topic
 
diff --git a/docs/modules/ROOT/pages/running/camel-runtimes.adoc 
b/docs/modules/ROOT/pages/running/camel-runtimes.adoc
index c479bfa31..3eacd7987 100644
--- a/docs/modules/ROOT/pages/running/camel-runtimes.adoc
+++ b/docs/modules/ROOT/pages/running/camel-runtimes.adoc
@@ -143,13 +143,13 @@ NOTE: this is a best effort analysis taking as reference 
the work available in v
 |v
 |x
 
-|KNative Service
+|Knative Service
 |x
 |x
 |x
 |v
 
-|KNative
+|Knative
 |x
 |v
 |x
diff --git a/docs/modules/ROOT/partials/apis/camel-k-crds.adoc 
b/docs/modules/ROOT/partials/apis/camel-k-crds.adoc
index abe5aee8c..89c648300 100644
--- a/docs/modules/ROOT/partials/apis/camel-k-crds.adoc
+++ b/docs/modules/ROOT/partials/apis/camel-k-crds.adoc
@@ -7737,6 +7737,24 @@ Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true" labe
 As Knative requires this label to perform injection of K_SINK URL into the 
service.
 If this is false, the integration pod may start and fail, read the SinkBinding 
Knative documentation. (default: true)
 
+|`filters` +
+[]string
+|
+
+
+Sets filter attributes on the event stream (such as event type, source, 
subject and so on).
+A list of key-value pairs that represent filter attributes and its values.
+The syntax is KEY=VALUE, e.g., `source="my.source"`.
+Filter attributes get set on the Knative trigger that is being created as part 
of this integration.
+
+|`filterEventType` +
+bool
+|
+
+
+Enables the default filtering for the Knative trigger using the event type
+If this is true, the created Knative trigger uses the event type as a filter 
on the event stream when no other filter criteria is given. (default: true)
+
 
 |===
 
diff --git a/docs/modules/traits/pages/knative.adoc 
b/docs/modules/traits/pages/knative.adoc
index d22f19f07..49b764ccb 100755
--- a/docs/modules/traits/pages/knative.adoc
+++ b/docs/modules/traits/pages/knative.adoc
@@ -87,6 +87,18 @@ It's enabled by default when the integration targets a 
single sink
 As Knative requires this label to perform injection of K_SINK URL into the 
service.
 If this is false, the integration pod may start and fail, read the SinkBinding 
Knative documentation. (default: true)
 
+| knative.filters
+| []string
+| Sets filter attributes on the event stream (such as event type, source, 
subject and so on).
+A list of key-value pairs that represent filter attributes and its values.
+The syntax is KEY=VALUE, e.g., `source="my.source"`.
+Filter attributes get set on the Knative trigger that is being created as part 
of this integration.
+
+| knative.filter-event-type
+| bool
+| Enables the default filtering for the Knative trigger using the event type
+If this is true, the created Knative trigger uses the event type as a filter 
on the event stream when no other filter criteria is given. (default: true)
+
 |===
 
 // End of autogenerated code - DO NOT EDIT! (configuration)
diff --git a/e2e/knative/kamelet_test.go b/e2e/knative/kamelet_test.go
index 6d4830638..1ae16ffe2 100644
--- a/e2e/knative/kamelet_test.go
+++ b/e2e/knative/kamelet_test.go
@@ -47,16 +47,16 @@ func TestKameletChange(t *testing.T) {
                timerSource := "my-timer-source"
                g.Expect(CreateTimerKamelet(t, ctx, operatorID, ns, 
timerSource)()).To(Succeed())
                g.Expect(CreateKnativeChannel(t, ctx, ns, 
knChannel)()).To(Succeed())
-               // Consumer route that will read from the KNative channel
+               // Consumer route that will read from the Knative channel
                g.Expect(KamelRunWithID(t, ctx, operatorID, ns, 
"files/test-kamelet-display.groovy", "-w").Execute()).To(Succeed())
                g.Eventually(IntegrationPodPhase(t, ctx, ns, 
"test-kamelet-display")).Should(Equal(corev1.PodRunning))
 
                // Create the Pipe
-               g.Expect(KamelBindWithID(t, ctx, operatorID, ns, timerSource, 
knChannelConf, "-p", "source.message=HelloKNative!", "--annotation", 
"trait.camel.apache.org/health.enabled=true", "--annotation", 
"trait.camel.apache.org/health.readiness-initial-delay=10", "--name", 
timerPipe).Execute()).To(Succeed())
+               g.Expect(KamelBindWithID(t, ctx, operatorID, ns, timerSource, 
knChannelConf, "-p", "source.message=HelloKnative!", "--annotation", 
"trait.camel.apache.org/health.enabled=true", "--annotation", 
"trait.camel.apache.org/health.readiness-initial-delay=10", "--name", 
timerPipe).Execute()).To(Succeed())
                g.Eventually(IntegrationPodPhase(t, ctx, ns, 
timerPipe)).Should(Equal(corev1.PodRunning))
                g.Eventually(IntegrationConditionStatus(t, ctx, ns, timerPipe, 
v1.IntegrationConditionReady), 
TestTimeoutShort).Should(Equal(corev1.ConditionTrue))
                // Consume the message
-               g.Eventually(IntegrationLogs(t, ctx, ns, 
"test-kamelet-display"), 
TestTimeoutShort).Should(ContainSubstring("HelloKNative!"))
+               g.Eventually(IntegrationLogs(t, ctx, ns, 
"test-kamelet-display"), 
TestTimeoutShort).Should(ContainSubstring("HelloKnative!"))
 
                g.Eventually(PipeCondition(t, ctx, ns, timerPipe, 
v1.PipeConditionReady), TestTimeoutMedium).Should(And(
                        WithTransform(PipeConditionStatusExtract, 
Equal(corev1.ConditionTrue)),
diff --git a/e2e/support/util/dump.go b/e2e/support/util/dump.go
index a5741e697..75b144db2 100644
--- a/e2e/support/util/dump.go
+++ b/e2e/support/util/dump.go
@@ -35,9 +35,12 @@ import (
 
        routev1 "github.com/openshift/api/route/v1"
        olm "github.com/operator-framework/api/pkg/operators/v1alpha1"
+       eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+       servingv1 "knative.dev/serving/pkg/apis/serving/v1"
 
        "github.com/apache/camel-k/v2/pkg/client"
        "github.com/apache/camel-k/v2/pkg/client/camel/clientset/versioned"
+       "github.com/apache/camel-k/v2/pkg/util/knative"
        "github.com/apache/camel-k/v2/pkg/util/kubernetes"
        "github.com/apache/camel-k/v2/pkg/util/openshift"
 )
@@ -202,6 +205,41 @@ func Dump(ctx context.Context, c client.Client, ns string, 
t *testing.T) error {
                t.Logf("---\n%s\n---\n", string(pdata))
        }
 
+       // Knative resources
+       if installed, _ := knative.IsEventingInstalled(c); installed {
+               var trgs eventingv1.TriggerList
+               err = c.List(ctx, &trgs)
+               if err != nil {
+                       return err
+               }
+               t.Logf("Found %d Knative trigger:\n", len(trgs.Items))
+               for _, p := range trgs.Items {
+                       ref := p
+                       pdata, err := kubernetes.ToYAMLNoManagedFields(&ref)
+                       if err != nil {
+                               return err
+                       }
+                       t.Logf("---\n%s\n---\n", string(pdata))
+               }
+       }
+
+       if installed, _ := knative.IsServingInstalled(c); installed {
+               var ksrvs servingv1.ServiceList
+               err = c.List(ctx, &ksrvs)
+               if err != nil {
+                       return err
+               }
+               t.Logf("Found %d Knative services:\n", len(ksrvs.Items))
+               for _, p := range ksrvs.Items {
+                       ref := p
+                       pdata, err := kubernetes.ToYAMLNoManagedFields(&ref)
+                       if err != nil {
+                               return err
+                       }
+                       t.Logf("---\n%s\n---\n", string(pdata))
+               }
+       }
+
        // CamelCatalogs
        cats, err := camelClient.CamelV1().CamelCatalogs(ns).List(ctx, 
metav1.ListOptions{})
        if err != nil {
diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml 
b/e2e/yaks/common/knative-broker/event-source-pipe.yaml
similarity index 60%
copy from e2e/yaks/common/knative-broker/yaks-config.yaml
copy to e2e/yaks/common/knative-broker/event-source-pipe.yaml
index cd754db64..16d821c3a 100644
--- a/e2e/yaks/common/knative-broker/yaks-config.yaml
+++ b/e2e/yaks/common/knative-broker/event-source-pipe.yaml
@@ -15,10 +15,31 @@
 # limitations under the License.
 # ---------------------------------------------------------------------------
 
-config:
-  namespace:
-    temporary: true
-post:
-  - name: print dump
-    if: env:CI=true && failure()
-    run: yaks dump --includes app=camel-k
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: event-source-pipe
+spec:
+  source:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1
+      name: timer-source
+    properties:
+      period: 5000
+      message: "Hello this is event-1!"
+  steps:
+    - ref:
+        kind: Kamelet
+        apiVersion: camel.apache.org/v1
+        name: log-action
+      properties:
+        showHeaders: true
+  sink:
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1
+      name: default
+    properties:
+      type: org.apache.camel.event.messages
+      ce.override.ce-source: org.apache.camel
diff --git a/e2e/yaks/common/knative-broker/knative-pipe.feature 
b/e2e/yaks/common/knative-broker/knative-pipe.feature
new file mode 100644
index 000000000..426b7830d
--- /dev/null
+++ b/e2e/yaks/common/knative-broker/knative-pipe.feature
@@ -0,0 +1,34 @@
+Feature: Pipes connecting with Knative broker
+
+  Background:
+    Given create Knative broker default
+    Given Knative broker default is running
+    Given Camel K resource polling configuration
+      | maxAttempts          | 60   |
+      | delayBetweenAttempts | 3000 |
+
+  Scenario: Pipes exchanging events with the broker
+    # Pipe pushing events to the broker
+    Given load Pipe event-source-pipe.yaml
+    Then Camel K integration event-source-pipe should be running
+
+    # Pipe receives given event type from the broker
+    Given load Pipe log-sink-pipe.yaml
+    Then Camel K integration log-sink-pipe should be running
+    And Camel K integration log-sink-pipe should print Hello this is event-1!
+
+    # Pipe receives all events from the broker
+    Given load Pipe no-filter-pipe.yaml
+    Then Camel K integration no-filter-pipe should be running
+    And Camel K integration no-filter-pipe should print Hello this is event-1!
+
+    # Pipe receives events with source filter
+    Given load Pipe source-filter-pipe.yaml
+    Then Camel K integration source-filter-pipe should be running
+    And Camel K integration source-filter-pipe should print Hello this is 
event-1!
+
+  Scenario: Remove resources
+    Given delete Camel K integration event-source-pipe
+    Given delete Camel K integration log-sink-pipe
+    Given delete Camel K integration source-filter-pipe
+    Given delete Camel K integration no-filter-pipe
diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml 
b/e2e/yaks/common/knative-broker/log-sink-pipe.yaml
similarity index 67%
copy from e2e/yaks/common/knative-broker/yaks-config.yaml
copy to e2e/yaks/common/knative-broker/log-sink-pipe.yaml
index cd754db64..dba1cf682 100644
--- a/e2e/yaks/common/knative-broker/yaks-config.yaml
+++ b/e2e/yaks/common/knative-broker/log-sink-pipe.yaml
@@ -15,10 +15,25 @@
 # limitations under the License.
 # ---------------------------------------------------------------------------
 
-config:
-  namespace:
-    temporary: true
-post:
-  - name: print dump
-    if: env:CI=true && failure()
-    run: yaks dump --includes app=camel-k
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: log-sink-pipe
+spec:
+  source:
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1
+      name: default
+    properties:
+      type: org.apache.camel.event.messages
+  sink:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1
+      name: log-sink
+    properties:
+      multiline: true
+      showHeaders: true
+      showBodyType: false
+      showExchangePattern: false
diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml 
b/e2e/yaks/common/knative-broker/log-sink.kamelet.yaml
similarity index 59%
copy from e2e/yaks/common/knative-broker/yaks-config.yaml
copy to e2e/yaks/common/knative-broker/log-sink.kamelet.yaml
index cd754db64..94a61c8ae 100644
--- a/e2e/yaks/common/knative-broker/yaks-config.yaml
+++ b/e2e/yaks/common/knative-broker/log-sink.kamelet.yaml
@@ -15,10 +15,33 @@
 # limitations under the License.
 # ---------------------------------------------------------------------------
 
-config:
-  namespace:
-    temporary: true
-post:
-  - name: print dump
-    if: env:CI=true && failure()
-    run: yaks dump --includes app=camel-k
+apiVersion: camel.apache.org/v1
+kind: Kamelet
+metadata:
+  name: log-sink
+  labels:
+    camel.apache.org/kamelet.type: "sink"
+spec:
+  definition:
+    title: "Logger"
+    description: "Logs the received payload of each incoming event"
+    properties:
+      prefix:
+        title: Prefix
+        description: The prefix to prepend to the logged message
+        type: string
+        default: "message: "
+  dataTypes:
+    in:
+      types:
+        text:
+          mediaType: text/plain
+    out:
+      types:
+        text:
+          mediaType: text/plain
+  template:
+    from:
+      uri: "kamelet:source"
+      steps:
+        - log: "{{prefix}}${body}"
diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml 
b/e2e/yaks/common/knative-broker/no-filter-pipe.yaml
similarity index 70%
copy from e2e/yaks/common/knative-broker/yaks-config.yaml
copy to e2e/yaks/common/knative-broker/no-filter-pipe.yaml
index cd754db64..3638543e7 100644
--- a/e2e/yaks/common/knative-broker/yaks-config.yaml
+++ b/e2e/yaks/common/knative-broker/no-filter-pipe.yaml
@@ -15,10 +15,23 @@
 # limitations under the License.
 # ---------------------------------------------------------------------------
 
-config:
-  namespace:
-    temporary: true
-post:
-  - name: print dump
-    if: env:CI=true && failure()
-    run: yaks dump --includes app=camel-k
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: no-filter-pipe
+spec:
+  source:
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1
+      name: default
+  sink:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1
+      name: log-sink
+    properties:
+      multiline: true
+      showHeaders: true
+      showBodyType: false
+      showExchangePattern: false
diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml 
b/e2e/yaks/common/knative-broker/source-filter-pipe.yaml
similarity index 67%
copy from e2e/yaks/common/knative-broker/yaks-config.yaml
copy to e2e/yaks/common/knative-broker/source-filter-pipe.yaml
index cd754db64..12a78a6fb 100644
--- a/e2e/yaks/common/knative-broker/yaks-config.yaml
+++ b/e2e/yaks/common/knative-broker/source-filter-pipe.yaml
@@ -15,10 +15,25 @@
 # limitations under the License.
 # ---------------------------------------------------------------------------
 
-config:
-  namespace:
-    temporary: true
-post:
-  - name: print dump
-    if: env:CI=true && failure()
-    run: yaks dump --includes app=camel-k
+apiVersion: camel.apache.org/v1
+kind: Pipe
+metadata:
+  name: source-filter-pipe
+spec:
+  source:
+    ref:
+      kind: Broker
+      apiVersion: eventing.knative.dev/v1
+      name: default
+    properties:
+      source: org.apache.camel
+  sink:
+    ref:
+      kind: Kamelet
+      apiVersion: camel.apache.org/v1
+      name: log-sink
+    properties:
+      multiline: true
+      showHeaders: true
+      showBodyType: false
+      showExchangePattern: false
diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml 
b/e2e/yaks/common/knative-broker/timer-source.kamelet.yaml
similarity index 51%
copy from e2e/yaks/common/knative-broker/yaks-config.yaml
copy to e2e/yaks/common/knative-broker/timer-source.kamelet.yaml
index cd754db64..e6a9922a7 100644
--- a/e2e/yaks/common/knative-broker/yaks-config.yaml
+++ b/e2e/yaks/common/knative-broker/timer-source.kamelet.yaml
@@ -15,10 +15,42 @@
 # limitations under the License.
 # ---------------------------------------------------------------------------
 
-config:
-  namespace:
-    temporary: true
-post:
-  - name: print dump
-    if: env:CI=true && failure()
-    run: yaks dump --includes app=camel-k
+apiVersion: camel.apache.org/v1
+kind: Kamelet
+metadata:
+  name: timer-source
+  labels:
+    camel.apache.org/kamelet.type: "source"
+spec:
+  definition:
+    title: "Timer"
+    description: "Produces periodic events with a custom payload"
+    required:
+      - message
+    properties:
+      period:
+        title: Period
+        description: The time interval between two events
+        type: integer
+        default: 1000
+      message:
+        title: Message
+        description: The message to generate
+        type: string
+  dataTypes:
+    out:
+      types:
+        json:
+          mediaType: application/json
+          schema:
+            id: text.camel.apache.org
+            type: string
+  template:
+    from:
+      uri: timer:tick
+      parameters:
+        period: "{{period}}"
+      steps:
+        - setBody:
+            constant: "{{message}}"
+        - to: "kamelet:sink"
diff --git a/e2e/yaks/common/knative-broker/yaks-config.yaml 
b/e2e/yaks/common/knative-broker/yaks-config.yaml
index cd754db64..6c5c2ebb4 100644
--- a/e2e/yaks/common/knative-broker/yaks-config.yaml
+++ b/e2e/yaks/common/knative-broker/yaks-config.yaml
@@ -18,7 +18,36 @@
 config:
   namespace:
     temporary: true
-post:
-  - name: print dump
-    if: env:CI=true && failure()
-    run: yaks dump --includes app=camel-k
+  runtime:
+    env:
+      - name: YAKS_CAMEL_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KAMELETS_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
+        value: false
+      - name: YAKS_KNATIVE_AUTO_REMOVE_RESOURCES
+        value: false
+    resources:
+      - event-source-pipe.yaml
+      - log-sink-pipe.yaml
+      - source-filter-pipe.yaml
+      - no-filter-pipe.yaml
+    settings:
+      loggers:
+        - name: INTEGRATION_STATUS
+          level: INFO
+        - name: INTEGRATION_LOGS
+          level: INFO
+  dump:
+    enabled: true
+    failedOnly: true
+    includes:
+      - app=camel-k
+pre:
+  - name: installation
+    run: |
+      kubectl apply -f timer-source.kamelet.yaml -n $YAKS_NAMESPACE
+      kubectl apply -f log-sink.kamelet.yaml -n $YAKS_NAMESPACE
diff --git a/e2e/yaks/common/knative-sinkbinding-http/sinkbinding-http.feature 
b/e2e/yaks/common/knative-sinkbinding-http/sinkbinding-http.feature
index 73e58336d..f2709fd9d 100644
--- a/e2e/yaks/common/knative-sinkbinding-http/sinkbinding-http.feature
+++ b/e2e/yaks/common/knative-sinkbinding-http/sinkbinding-http.feature
@@ -6,4 +6,4 @@ Feature: Camel K can run source HTTP endpoint in sinkbinding 
mode
       | delayBetweenAttempts | 3000 |
 
   Scenario: Integration knative-service starts with no errors
-    Given wait for condition=Ready on Kubernetes custom resource 
integration/rest2channel in integration.camel.apache.org/v1
+    Given Camel K integration rest2channel should be running
diff --git a/helm/camel-k/crds/crd-integration-platform.yaml 
b/helm/camel-k/crds/crd-integration-platform.yaml
index eeeec4364..3107e80cb 100644
--- a/helm/camel-k/crds/crd-integration-platform.yaml
+++ b/helm/camel-k/crds/crd-integration-platform.yaml
@@ -1395,11 +1395,28 @@ spec:
                         items:
                           type: string
                         type: array
+                      filterEventType:
+                        description: 'Enables the default filtering for the 
Knative
+                          trigger using the event type If this is true, the 
created
+                          Knative trigger uses the event type as a filter on 
the event
+                          stream when no other filter criteria is given. 
(default:
+                          true)'
+                        type: boolean
                       filterSourceChannels:
                         description: Enables filtering on events based on the 
header
                           "ce-knativehistory". Since this header has been 
removed
                           in newer versions of Knative, filtering is disabled 
by default.
                         type: boolean
+                      filters:
+                        description: Sets filter attributes on the event 
stream (such
+                          as event type, source, subject and so on). A list of 
key-value
+                          pairs that represent filter attributes and its 
values. The
+                          syntax is KEY=VALUE, e.g., `source="my.source"`. 
Filter
+                          attributes get set on the Knative trigger that is 
being
+                          created as part of this integration.
+                        items:
+                          type: string
+                        type: array
                       namespaceLabel:
                         description: 'Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true"
                           label to the namespace As Knative requires this 
label to
@@ -3392,11 +3409,28 @@ spec:
                         items:
                           type: string
                         type: array
+                      filterEventType:
+                        description: 'Enables the default filtering for the 
Knative
+                          trigger using the event type If this is true, the 
created
+                          Knative trigger uses the event type as a filter on 
the event
+                          stream when no other filter criteria is given. 
(default:
+                          true)'
+                        type: boolean
                       filterSourceChannels:
                         description: Enables filtering on events based on the 
header
                           "ce-knativehistory". Since this header has been 
removed
                           in newer versions of Knative, filtering is disabled 
by default.
                         type: boolean
+                      filters:
+                        description: Sets filter attributes on the event 
stream (such
+                          as event type, source, subject and so on). A list of 
key-value
+                          pairs that represent filter attributes and its 
values. The
+                          syntax is KEY=VALUE, e.g., `source="my.source"`. 
Filter
+                          attributes get set on the Knative trigger that is 
being
+                          created as part of this integration.
+                        items:
+                          type: string
+                        type: array
                       namespaceLabel:
                         description: 'Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true"
                           label to the namespace As Knative requires this 
label to
diff --git a/helm/camel-k/crds/crd-integration-profile.yaml 
b/helm/camel-k/crds/crd-integration-profile.yaml
index c827dbd52..2c24920a1 100644
--- a/helm/camel-k/crds/crd-integration-profile.yaml
+++ b/helm/camel-k/crds/crd-integration-profile.yaml
@@ -1272,11 +1272,28 @@ spec:
                         items:
                           type: string
                         type: array
+                      filterEventType:
+                        description: 'Enables the default filtering for the 
Knative
+                          trigger using the event type If this is true, the 
created
+                          Knative trigger uses the event type as a filter on 
the event
+                          stream when no other filter criteria is given. 
(default:
+                          true)'
+                        type: boolean
                       filterSourceChannels:
                         description: Enables filtering on events based on the 
header
                           "ce-knativehistory". Since this header has been 
removed
                           in newer versions of Knative, filtering is disabled 
by default.
                         type: boolean
+                      filters:
+                        description: Sets filter attributes on the event 
stream (such
+                          as event type, source, subject and so on). A list of 
key-value
+                          pairs that represent filter attributes and its 
values. The
+                          syntax is KEY=VALUE, e.g., `source="my.source"`. 
Filter
+                          attributes get set on the Knative trigger that is 
being
+                          created as part of this integration.
+                        items:
+                          type: string
+                        type: array
                       namespaceLabel:
                         description: 'Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true"
                           label to the namespace As Knative requires this 
label to
@@ -3152,11 +3169,28 @@ spec:
                         items:
                           type: string
                         type: array
+                      filterEventType:
+                        description: 'Enables the default filtering for the 
Knative
+                          trigger using the event type If this is true, the 
created
+                          Knative trigger uses the event type as a filter on 
the event
+                          stream when no other filter criteria is given. 
(default:
+                          true)'
+                        type: boolean
                       filterSourceChannels:
                         description: Enables filtering on events based on the 
header
                           "ce-knativehistory". Since this header has been 
removed
                           in newer versions of Knative, filtering is disabled 
by default.
                         type: boolean
+                      filters:
+                        description: Sets filter attributes on the event 
stream (such
+                          as event type, source, subject and so on). A list of 
key-value
+                          pairs that represent filter attributes and its 
values. The
+                          syntax is KEY=VALUE, e.g., `source="my.source"`. 
Filter
+                          attributes get set on the Knative trigger that is 
being
+                          created as part of this integration.
+                        items:
+                          type: string
+                        type: array
                       namespaceLabel:
                         description: 'Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true"
                           label to the namespace As Knative requires this 
label to
diff --git a/helm/camel-k/crds/crd-integration.yaml 
b/helm/camel-k/crds/crd-integration.yaml
index 40b06eff2..26fe73641 100644
--- a/helm/camel-k/crds/crd-integration.yaml
+++ b/helm/camel-k/crds/crd-integration.yaml
@@ -7336,11 +7336,28 @@ spec:
                         items:
                           type: string
                         type: array
+                      filterEventType:
+                        description: 'Enables the default filtering for the 
Knative
+                          trigger using the event type If this is true, the 
created
+                          Knative trigger uses the event type as a filter on 
the event
+                          stream when no other filter criteria is given. 
(default:
+                          true)'
+                        type: boolean
                       filterSourceChannels:
                         description: Enables filtering on events based on the 
header
                           "ce-knativehistory". Since this header has been 
removed
                           in newer versions of Knative, filtering is disabled 
by default.
                         type: boolean
+                      filters:
+                        description: Sets filter attributes on the event 
stream (such
+                          as event type, source, subject and so on). A list of 
key-value
+                          pairs that represent filter attributes and its 
values. The
+                          syntax is KEY=VALUE, e.g., `source="my.source"`. 
Filter
+                          attributes get set on the Knative trigger that is 
being
+                          created as part of this integration.
+                        items:
+                          type: string
+                        type: array
                       namespaceLabel:
                         description: 'Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true"
                           label to the namespace As Knative requires this 
label to
diff --git a/helm/camel-k/crds/crd-kamelet-binding.yaml 
b/helm/camel-k/crds/crd-kamelet-binding.yaml
index fbfba55ca..202a701ed 100644
--- a/helm/camel-k/crds/crd-kamelet-binding.yaml
+++ b/helm/camel-k/crds/crd-kamelet-binding.yaml
@@ -7624,12 +7624,29 @@ spec:
                             items:
                               type: string
                             type: array
+                          filterEventType:
+                            description: 'Enables the default filtering for 
the Knative
+                              trigger using the event type If this is true, 
the created
+                              Knative trigger uses the event type as a filter 
on the
+                              event stream when no other filter criteria is 
given.
+                              (default: true)'
+                            type: boolean
                           filterSourceChannels:
                             description: Enables filtering on events based on 
the
                               header "ce-knativehistory". Since this header 
has been
                               removed in newer versions of Knative, filtering 
is disabled
                               by default.
                             type: boolean
+                          filters:
+                            description: Sets filter attributes on the event 
stream
+                              (such as event type, source, subject and so on). 
A list
+                              of key-value pairs that represent filter 
attributes
+                              and its values. The syntax is KEY=VALUE, e.g., 
`source="my.source"`.
+                              Filter attributes get set on the Knative trigger 
that
+                              is being created as part of this integration.
+                            items:
+                              type: string
+                            type: array
                           namespaceLabel:
                             description: 'Enables the camel-k-operator to set 
the
                               "bindings.knative.dev/include=true" label to the 
namespace
diff --git a/helm/camel-k/crds/crd-pipe.yaml b/helm/camel-k/crds/crd-pipe.yaml
index 29962f268..c2648f8c8 100644
--- a/helm/camel-k/crds/crd-pipe.yaml
+++ b/helm/camel-k/crds/crd-pipe.yaml
@@ -7622,12 +7622,29 @@ spec:
                             items:
                               type: string
                             type: array
+                          filterEventType:
+                            description: 'Enables the default filtering for 
the Knative
+                              trigger using the event type If this is true, 
the created
+                              Knative trigger uses the event type as a filter 
on the
+                              event stream when no other filter criteria is 
given.
+                              (default: true)'
+                            type: boolean
                           filterSourceChannels:
                             description: Enables filtering on events based on 
the
                               header "ce-knativehistory". Since this header 
has been
                               removed in newer versions of Knative, filtering 
is disabled
                               by default.
                             type: boolean
+                          filters:
+                            description: Sets filter attributes on the event 
stream
+                              (such as event type, source, subject and so on). 
A list
+                              of key-value pairs that represent filter 
attributes
+                              and its values. The syntax is KEY=VALUE, e.g., 
`source="my.source"`.
+                              Filter attributes get set on the Knative trigger 
that
+                              is being created as part of this integration.
+                            items:
+                              type: string
+                            type: array
                           namespaceLabel:
                             description: 'Enables the camel-k-operator to set 
the
                               "bindings.knative.dev/include=true" label to the 
namespace
diff --git a/pkg/apis/camel/v1/trait/knative.go 
b/pkg/apis/camel/v1/trait/knative.go
index b8ce7fe46..2681ac982 100644
--- a/pkg/apis/camel/v1/trait/knative.go
+++ b/pkg/apis/camel/v1/trait/knative.go
@@ -60,4 +60,12 @@ type KnativeTrait struct {
        // As Knative requires this label to perform injection of K_SINK URL 
into the service.
        // If this is false, the integration pod may start and fail, read the 
SinkBinding Knative documentation. (default: true)
        NamespaceLabel *bool `property:"namespace-label" 
json:"namespaceLabel,omitempty"`
+       // Sets filter attributes on the event stream (such as event type, 
source, subject and so on).
+       // A list of key-value pairs that represent filter attributes and its 
values.
+       // The syntax is KEY=VALUE, e.g., `source="my.source"`.
+       // Filter attributes get set on the Knative trigger that is being 
created as part of this integration.
+       Filters []string `property:"filters" json:"filters,omitempty"`
+       // Enables the default filtering for the Knative trigger using the 
event type
+       // If this is true, the created Knative trigger uses the event type as 
a filter on the event stream when no other filter criteria is given. (default: 
true)
+       FilterEventType *bool `property:"filter-event-type" 
json:"filterEventType,omitempty"`
 }
diff --git a/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go 
b/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go
index c8c03773e..a8db3093a 100644
--- a/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go
+++ b/pkg/apis/camel/v1/trait/zz_generated.deepcopy.go
@@ -706,6 +706,16 @@ func (in *KnativeTrait) DeepCopyInto(out *KnativeTrait) {
                *out = new(bool)
                **out = **in
        }
+       if in.Filters != nil {
+               in, out := &in.Filters, &out.Filters
+               *out = make([]string, len(*in))
+               copy(*out, *in)
+       }
+       if in.FilterEventType != nil {
+               in, out := &in.FilterEventType, &out.FilterEventType
+               *out = new(bool)
+               **out = **in
+       }
 }
 
 // DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new KnativeTrait.
diff --git 
a/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml 
b/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml
index eeeec4364..3107e80cb 100644
--- a/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml
+++ b/pkg/resources/config/crd/bases/camel.apache.org_integrationplatforms.yaml
@@ -1395,11 +1395,28 @@ spec:
                         items:
                           type: string
                         type: array
+                      filterEventType:
+                        description: 'Enables the default filtering for the 
Knative
+                          trigger using the event type If this is true, the 
created
+                          Knative trigger uses the event type as a filter on 
the event
+                          stream when no other filter criteria is given. 
(default:
+                          true)'
+                        type: boolean
                       filterSourceChannels:
                         description: Enables filtering on events based on the 
header
                           "ce-knativehistory". Since this header has been 
removed
                           in newer versions of Knative, filtering is disabled 
by default.
                         type: boolean
+                      filters:
+                        description: Sets filter attributes on the event 
stream (such
+                          as event type, source, subject and so on). A list of 
key-value
+                          pairs that represent filter attributes and its 
values. The
+                          syntax is KEY=VALUE, e.g., `source="my.source"`. 
Filter
+                          attributes get set on the Knative trigger that is 
being
+                          created as part of this integration.
+                        items:
+                          type: string
+                        type: array
                       namespaceLabel:
                         description: 'Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true"
                           label to the namespace As Knative requires this 
label to
@@ -3392,11 +3409,28 @@ spec:
                         items:
                           type: string
                         type: array
+                      filterEventType:
+                        description: 'Enables the default filtering for the 
Knative
+                          trigger using the event type If this is true, the 
created
+                          Knative trigger uses the event type as a filter on 
the event
+                          stream when no other filter criteria is given. 
(default:
+                          true)'
+                        type: boolean
                       filterSourceChannels:
                         description: Enables filtering on events based on the 
header
                           "ce-knativehistory". Since this header has been 
removed
                           in newer versions of Knative, filtering is disabled 
by default.
                         type: boolean
+                      filters:
+                        description: Sets filter attributes on the event 
stream (such
+                          as event type, source, subject and so on). A list of 
key-value
+                          pairs that represent filter attributes and its 
values. The
+                          syntax is KEY=VALUE, e.g., `source="my.source"`. 
Filter
+                          attributes get set on the Knative trigger that is 
being
+                          created as part of this integration.
+                        items:
+                          type: string
+                        type: array
                       namespaceLabel:
                         description: 'Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true"
                           label to the namespace As Knative requires this 
label to
diff --git 
a/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml 
b/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml
index c827dbd52..2c24920a1 100644
--- a/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml
+++ b/pkg/resources/config/crd/bases/camel.apache.org_integrationprofiles.yaml
@@ -1272,11 +1272,28 @@ spec:
                         items:
                           type: string
                         type: array
+                      filterEventType:
+                        description: 'Enables the default filtering for the 
Knative
+                          trigger using the event type If this is true, the 
created
+                          Knative trigger uses the event type as a filter on 
the event
+                          stream when no other filter criteria is given. 
(default:
+                          true)'
+                        type: boolean
                       filterSourceChannels:
                         description: Enables filtering on events based on the 
header
                           "ce-knativehistory". Since this header has been 
removed
                           in newer versions of Knative, filtering is disabled 
by default.
                         type: boolean
+                      filters:
+                        description: Sets filter attributes on the event 
stream (such
+                          as event type, source, subject and so on). A list of 
key-value
+                          pairs that represent filter attributes and its 
values. The
+                          syntax is KEY=VALUE, e.g., `source="my.source"`. 
Filter
+                          attributes get set on the Knative trigger that is 
being
+                          created as part of this integration.
+                        items:
+                          type: string
+                        type: array
                       namespaceLabel:
                         description: 'Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true"
                           label to the namespace As Knative requires this 
label to
@@ -3152,11 +3169,28 @@ spec:
                         items:
                           type: string
                         type: array
+                      filterEventType:
+                        description: 'Enables the default filtering for the 
Knative
+                          trigger using the event type If this is true, the 
created
+                          Knative trigger uses the event type as a filter on 
the event
+                          stream when no other filter criteria is given. 
(default:
+                          true)'
+                        type: boolean
                       filterSourceChannels:
                         description: Enables filtering on events based on the 
header
                           "ce-knativehistory". Since this header has been 
removed
                           in newer versions of Knative, filtering is disabled 
by default.
                         type: boolean
+                      filters:
+                        description: Sets filter attributes on the event 
stream (such
+                          as event type, source, subject and so on). A list of 
key-value
+                          pairs that represent filter attributes and its 
values. The
+                          syntax is KEY=VALUE, e.g., `source="my.source"`. 
Filter
+                          attributes get set on the Knative trigger that is 
being
+                          created as part of this integration.
+                        items:
+                          type: string
+                        type: array
                       namespaceLabel:
                         description: 'Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true"
                           label to the namespace As Knative requires this 
label to
diff --git a/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml 
b/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml
index 40b06eff2..26fe73641 100644
--- a/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml
+++ b/pkg/resources/config/crd/bases/camel.apache.org_integrations.yaml
@@ -7336,11 +7336,28 @@ spec:
                         items:
                           type: string
                         type: array
+                      filterEventType:
+                        description: 'Enables the default filtering for the 
Knative
+                          trigger using the event type If this is true, the 
created
+                          Knative trigger uses the event type as a filter on 
the event
+                          stream when no other filter criteria is given. 
(default:
+                          true)'
+                        type: boolean
                       filterSourceChannels:
                         description: Enables filtering on events based on the 
header
                           "ce-knativehistory". Since this header has been 
removed
                           in newer versions of Knative, filtering is disabled 
by default.
                         type: boolean
+                      filters:
+                        description: Sets filter attributes on the event 
stream (such
+                          as event type, source, subject and so on). A list of 
key-value
+                          pairs that represent filter attributes and its 
values. The
+                          syntax is KEY=VALUE, e.g., `source="my.source"`. 
Filter
+                          attributes get set on the Knative trigger that is 
being
+                          created as part of this integration.
+                        items:
+                          type: string
+                        type: array
                       namespaceLabel:
                         description: 'Enables the camel-k-operator to set the 
"bindings.knative.dev/include=true"
                           label to the namespace As Knative requires this 
label to
diff --git 
a/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml 
b/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml
index fbfba55ca..202a701ed 100644
--- a/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml
+++ b/pkg/resources/config/crd/bases/camel.apache.org_kameletbindings.yaml
@@ -7624,12 +7624,29 @@ spec:
                             items:
                               type: string
                             type: array
+                          filterEventType:
+                            description: 'Enables the default filtering for 
the Knative
+                              trigger using the event type If this is true, 
the created
+                              Knative trigger uses the event type as a filter 
on the
+                              event stream when no other filter criteria is 
given.
+                              (default: true)'
+                            type: boolean
                           filterSourceChannels:
                             description: Enables filtering on events based on 
the
                               header "ce-knativehistory". Since this header 
has been
                               removed in newer versions of Knative, filtering 
is disabled
                               by default.
                             type: boolean
+                          filters:
+                            description: Sets filter attributes on the event 
stream
+                              (such as event type, source, subject and so on). 
A list
+                              of key-value pairs that represent filter 
attributes
+                              and its values. The syntax is KEY=VALUE, e.g., 
`source="my.source"`.
+                              Filter attributes get set on the Knative trigger 
that
+                              is being created as part of this integration.
+                            items:
+                              type: string
+                            type: array
                           namespaceLabel:
                             description: 'Enables the camel-k-operator to set 
the
                               "bindings.knative.dev/include=true" label to the 
namespace
diff --git a/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml 
b/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml
index 29962f268..c2648f8c8 100644
--- a/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml
+++ b/pkg/resources/config/crd/bases/camel.apache.org_pipes.yaml
@@ -7622,12 +7622,29 @@ spec:
                             items:
                               type: string
                             type: array
+                          filterEventType:
+                            description: 'Enables the default filtering for 
the Knative
+                              trigger using the event type If this is true, 
the created
+                              Knative trigger uses the event type as a filter 
on the
+                              event stream when no other filter criteria is 
given.
+                              (default: true)'
+                            type: boolean
                           filterSourceChannels:
                             description: Enables filtering on events based on 
the
                               header "ce-knativehistory". Since this header 
has been
                               removed in newer versions of Knative, filtering 
is disabled
                               by default.
                             type: boolean
+                          filters:
+                            description: Sets filter attributes on the event 
stream
+                              (such as event type, source, subject and so on). 
A list
+                              of key-value pairs that represent filter 
attributes
+                              and its values. The syntax is KEY=VALUE, e.g., 
`source="my.source"`.
+                              Filter attributes get set on the Knative trigger 
that
+                              is being created as part of this integration.
+                            items:
+                              type: string
+                            type: array
                           namespaceLabel:
                             description: 'Enables the camel-k-operator to set 
the
                               "bindings.knative.dev/include=true" label to the 
namespace
diff --git a/pkg/trait/jvm_test.go b/pkg/trait/jvm_test.go
index ff5fae279..9206e4514 100644
--- a/pkg/trait/jvm_test.go
+++ b/pkg/trait/jvm_test.go
@@ -309,7 +309,7 @@ func TestApplyJvmTraitWithDeploymentResource(t *testing.T) {
        }, d.Spec.Template.Spec.Containers[0].Args)
 }
 
-func TestApplyJvmTraitWithKNativeResource(t *testing.T) {
+func TestApplyJvmTraitWithKnativeResource(t *testing.T) {
        trait, environment := 
createNominalJvmTest(v1.IntegrationKitTypePlatform)
 
        s := serving.Service{}
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index db87ad504..c8aa3fc87 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -25,6 +25,7 @@ import (
        "strings"
 
        "github.com/apache/camel-k/v2/pkg/util/boolean"
+       "github.com/apache/camel-k/v2/pkg/util/property"
 
        corev1 "k8s.io/api/core/v1"
        k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -501,41 +502,53 @@ func (t *knativeTrait) configureSinkBinding(e 
*Environment, env *knativeapi.Came
 }
 
 func (t *knativeTrait) createTrigger(e *Environment, ref 
*corev1.ObjectReference, eventType string, path string) error {
-       // TODO extend to additional filters too, to filter them at source and 
not at destination
        found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) 
bool {
                return trigger.Spec.Broker == ref.Name &&
-                       trigger.Spec.Filter != nil &&
-                       trigger.Spec.Filter.Attributes["type"] == eventType // 
can be also missing
+                       trigger.Name == knativeutil.GetTriggerName(ref.Name, 
e.Integration.Name, eventType)
        })
-       if !found {
-               if ref.Namespace == "" {
-                       ref.Namespace = e.Integration.Namespace
-               }
 
-               controllerStrategy, err := e.DetermineControllerStrategy()
+       if found {
+               return nil
+       }
+
+       if ref.Namespace == "" {
+               ref.Namespace = e.Integration.Namespace
+       }
+
+       controllerStrategy, err := e.DetermineControllerStrategy()
+       if err != nil {
+               return err
+       }
+
+       var attributes = make(map[string]string)
+       for _, filterExpression := range t.Filters {
+               key, value := property.SplitPropertyFileEntry(filterExpression)
+               attributes[key] = value
+       }
+
+       if _, eventTypeSpecified := attributes["type"]; !eventTypeSpecified && 
pointer.BoolDeref(t.FilterEventType, true) && eventType != "" {
+               // Apply default trigger filter attribute for the event type
+               attributes["type"] = eventType
+       }
+
+       var trigger *eventing.Trigger
+       switch controllerStrategy {
+       case ControllerStrategyKnativeService:
+               trigger, err = knativeutil.CreateKnativeServiceTrigger(*ref, 
e.Integration.Name, eventType, path, attributes)
                if err != nil {
                        return err
                }
-
-               var trigger *eventing.Trigger
-               switch controllerStrategy {
-               case ControllerStrategyKnativeService:
-                       trigger, err = 
knativeutil.CreateKnativeServiceTrigger(*ref, e.Integration.Name, eventType, 
path)
-                       if err != nil {
-                               return err
-                       }
-               case ControllerStrategyDeployment:
-                       trigger, err = knativeutil.CreateServiceTrigger(*ref, 
e.Integration.Name, eventType, path)
-                       if err != nil {
-                               return err
-                       }
-               default:
-                       return fmt.Errorf("failed to create Knative trigger: 
unsupported controller strategy %s", controllerStrategy)
+       case ControllerStrategyDeployment:
+               trigger, err = knativeutil.CreateServiceTrigger(*ref, 
e.Integration.Name, eventType, path, attributes)
+               if err != nil {
+                       return err
                }
-
-               e.Resources.Add(trigger)
+       default:
+               return fmt.Errorf("failed to create Knative trigger: 
unsupported controller strategy %s", controllerStrategy)
        }
 
+       e.Resources.Add(trigger)
+
        return nil
 }
 
diff --git a/pkg/trait/knative_test.go b/pkg/trait/knative_test.go
index d580a3f24..bcecc5581 100644
--- a/pkg/trait/knative_test.go
+++ b/pkg/trait/knative_test.go
@@ -43,6 +43,7 @@ import (
        "github.com/apache/camel-k/v2/pkg/client"
        "github.com/apache/camel-k/v2/pkg/util/boolean"
        "github.com/apache/camel-k/v2/pkg/util/camel"
+       "github.com/apache/camel-k/v2/pkg/util/knative"
        k8sutils "github.com/apache/camel-k/v2/pkg/util/kubernetes"
        "github.com/apache/camel-k/v2/pkg/util/test"
 )
@@ -272,7 +273,7 @@ func TestKnativeEnvConfigurationFromSource(t *testing.T) {
        }))
 }
 
-func TestKnativeTriggerConfiguration(t *testing.T) {
+func TestKnativeTriggerExplicitFilterConfig(t *testing.T) {
        catalog, err := camel.DefaultCatalog()
        require.NoError(t, err)
 
@@ -316,6 +317,7 @@ func TestKnativeTriggerConfiguration(t *testing.T) {
                                                Trait: traitv1.Trait{
                                                        Enabled: 
pointer.Bool(true),
                                                },
+                                               Filters: 
[]string{"source=my-source"},
                                        },
                                },
                        },
@@ -352,20 +354,688 @@ func TestKnativeTriggerConfiguration(t *testing.T) {
        assert.NotEmpty(t, environment.ExecutedTraits)
        assert.NotNil(t, environment.GetTrait("knative"))
 
-       assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
-               matching := true
+       trigger := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "evt.type")
+       })
 
-               matching = matching && assert.Equal(t, "default", 
trigger.Spec.Broker)
-               matching = matching && assert.Equal(t, 
serving.SchemeGroupVersion.String(), trigger.Spec.Subscriber.Ref.APIVersion)
-               matching = matching && assert.Equal(t, "Service", 
trigger.Spec.Subscriber.Ref.Kind)
-               matching = matching && assert.Equal(t, "/events/evt.type", 
trigger.Spec.Subscriber.URI.Path)
-               matching = matching && assert.Equal(t, "default-test-evttype", 
trigger.Name)
+       assert.NotNil(t, trigger)
 
-               return matching
-       }))
+       assert.Equal(t, "default", trigger.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test-evttype", trigger.Name)
+
+       assert.NotNil(t, trigger.Spec.Filter)
+       assert.Len(t, trigger.Spec.Filter.Attributes, 2)
+       assert.Equal(t, trigger.Spec.Filter.Attributes["type"], "evt.type")
+       assert.Equal(t, trigger.Spec.Filter.Attributes["source"], "my-source")
+}
+
+func TestKnativeTriggerExplicitFilterConfigNoEventTypeFilter(t *testing.T) {
+       catalog, err := camel.DefaultCatalog()
+       require.NoError(t, err)
+
+       c, err := NewFakeClient("ns")
+       require.NoError(t, err)
+
+       traitCatalog := NewCatalog(c)
+
+       environment := Environment{
+               CamelCatalog: catalog,
+               Catalog:      traitCatalog,
+               Client:       c,
+               Integration: &v1.Integration{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name:      "test",
+                               Namespace: "ns",
+                       },
+                       Status: v1.IntegrationStatus{
+                               Phase: v1.IntegrationPhaseDeploying,
+                       },
+                       Spec: v1.IntegrationSpec{
+                               Profile: v1.TraitProfileKnative,
+                               Sources: []v1.SourceSpec{
+                                       {
+                                               DataSpec: v1.DataSpec{
+                                                       Name: "route.java",
+                                                       Content: `
+                                                               public class 
CartoonMessagesMover extends RouteBuilder {
+                                                                       public 
void configure() {
+                                                                               
from("knative:event/evt.type")
+                                                                               
        .log("${body}");
+                                                                       }
+                                                               }
+                                                       `,
+                                               },
+                                               Language: v1.LanguageJavaSource,
+                                       },
+                               },
+                               Traits: v1.Traits{
+                                       Knative: &traitv1.KnativeTrait{
+                                               Trait: traitv1.Trait{
+                                                       Enabled: 
pointer.Bool(true),
+                                               },
+                                               Filters:         
[]string{"source=my-source"},
+                                               FilterEventType: 
pointer.Bool(false),
+                                       },
+                               },
+                       },
+               },
+               IntegrationKit: &v1.IntegrationKit{
+                       Status: v1.IntegrationKitStatus{
+                               Phase: v1.IntegrationKitPhaseReady,
+                       },
+               },
+               Platform: &v1.IntegrationPlatform{
+                       Spec: v1.IntegrationPlatformSpec{
+                               Cluster: v1.IntegrationPlatformClusterOpenShift,
+                               Build: v1.IntegrationPlatformBuildSpec{
+                                       PublishStrategy: 
v1.IntegrationPlatformBuildPublishStrategyS2I,
+                                       Registry:        
v1.RegistrySpec{Address: "registry"},
+                                       RuntimeVersion:  
catalog.Runtime.Version,
+                               },
+                               Profile: v1.TraitProfileKnative,
+                       },
+                       Status: v1.IntegrationPlatformStatus{
+                               Phase: v1.IntegrationPlatformPhaseReady,
+                       },
+               },
+               EnvVars:        make([]corev1.EnvVar, 0),
+               ExecutedTraits: make([]Trait, 0),
+               Resources:      k8sutils.NewCollection(),
+       }
+       environment.Platform.ResyncStatusFullConfig()
+
+       // don't care about conditions in this unit test
+       _, err = traitCatalog.apply(&environment)
+
+       require.NoError(t, err)
+       assert.NotEmpty(t, environment.ExecutedTraits)
+       assert.NotNil(t, environment.GetTrait("knative"))
+
+       trigger := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "evt.type")
+       })
+
+       assert.NotNil(t, trigger)
+
+       assert.Equal(t, "default", trigger.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test-evttype", trigger.Name)
+
+       assert.NotNil(t, trigger.Spec.Filter)
+       assert.Len(t, trigger.Spec.Filter.Attributes, 1)
+       assert.Equal(t, trigger.Spec.Filter.Attributes["source"], "my-source")
+}
+
+func TestKnativeTriggerDefaultEventTypeFilter(t *testing.T) {
+       catalog, err := camel.DefaultCatalog()
+       require.NoError(t, err)
+
+       c, err := NewFakeClient("ns")
+       require.NoError(t, err)
+
+       traitCatalog := NewCatalog(c)
+
+       environment := Environment{
+               CamelCatalog: catalog,
+               Catalog:      traitCatalog,
+               Client:       c,
+               Integration: &v1.Integration{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name:      "test",
+                               Namespace: "ns",
+                       },
+                       Status: v1.IntegrationStatus{
+                               Phase: v1.IntegrationPhaseDeploying,
+                       },
+                       Spec: v1.IntegrationSpec{
+                               Profile: v1.TraitProfileKnative,
+                               Sources: []v1.SourceSpec{
+                                       {
+                                               DataSpec: v1.DataSpec{
+                                                       Name: "route.java",
+                                                       Content: `
+                                                               public class 
CartoonMessagesMover extends RouteBuilder {
+                                                                       public 
void configure() {
+                                                                               
from("knative:event/evt.type")
+                                                                               
        .log("${body}");
+                                                                       }
+                                                               }
+                                                       `,
+                                               },
+                                               Language: v1.LanguageJavaSource,
+                                       },
+                               },
+                               Traits: v1.Traits{
+                                       Knative: &traitv1.KnativeTrait{
+                                               Trait: traitv1.Trait{
+                                                       Enabled: 
pointer.Bool(true),
+                                               },
+                                       },
+                               },
+                       },
+               },
+               IntegrationKit: &v1.IntegrationKit{
+                       Status: v1.IntegrationKitStatus{
+                               Phase: v1.IntegrationKitPhaseReady,
+                       },
+               },
+               Platform: &v1.IntegrationPlatform{
+                       Spec: v1.IntegrationPlatformSpec{
+                               Cluster: v1.IntegrationPlatformClusterOpenShift,
+                               Build: v1.IntegrationPlatformBuildSpec{
+                                       PublishStrategy: 
v1.IntegrationPlatformBuildPublishStrategyS2I,
+                                       Registry:        
v1.RegistrySpec{Address: "registry"},
+                                       RuntimeVersion:  
catalog.Runtime.Version,
+                               },
+                               Profile: v1.TraitProfileKnative,
+                       },
+                       Status: v1.IntegrationPlatformStatus{
+                               Phase: v1.IntegrationPlatformPhaseReady,
+                       },
+               },
+               EnvVars:        make([]corev1.EnvVar, 0),
+               ExecutedTraits: make([]Trait, 0),
+               Resources:      k8sutils.NewCollection(),
+       }
+       environment.Platform.ResyncStatusFullConfig()
+
+       // don't care about conditions in this unit test
+       _, err = traitCatalog.apply(&environment)
+
+       require.NoError(t, err)
+       assert.NotEmpty(t, environment.ExecutedTraits)
+       assert.NotNil(t, environment.GetTrait("knative"))
+
+       trigger := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "evt.type")
+       })
+
+       assert.NotNil(t, trigger)
+       assert.Equal(t, "default", trigger.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test-evttype", trigger.Name)
+
+       assert.NotNil(t, trigger.Spec.Filter)
+       assert.Len(t, trigger.Spec.Filter.Attributes, 1)
+       assert.Equal(t, "evt.type", trigger.Spec.Filter.Attributes["type"])
+}
+
+func TestKnativeTriggerDefaultEventTypeFilterDisabled(t *testing.T) {
+       catalog, err := camel.DefaultCatalog()
+       require.NoError(t, err)
+
+       c, err := NewFakeClient("ns")
+       require.NoError(t, err)
+
+       traitCatalog := NewCatalog(c)
+
+       environment := Environment{
+               CamelCatalog: catalog,
+               Catalog:      traitCatalog,
+               Client:       c,
+               Integration: &v1.Integration{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name:      "test",
+                               Namespace: "ns",
+                       },
+                       Status: v1.IntegrationStatus{
+                               Phase: v1.IntegrationPhaseDeploying,
+                       },
+                       Spec: v1.IntegrationSpec{
+                               Profile: v1.TraitProfileKnative,
+                               Sources: []v1.SourceSpec{
+                                       {
+                                               DataSpec: v1.DataSpec{
+                                                       Name: "route.java",
+                                                       Content: `
+                                                               public class 
CartoonMessagesMover extends RouteBuilder {
+                                                                       public 
void configure() {
+                                                                               
from("knative:event/evt.type")
+                                                                               
        .log("${body}");
+                                                                       }
+                                                               }
+                                                       `,
+                                               },
+                                               Language: v1.LanguageJavaSource,
+                                       },
+                               },
+                               Traits: v1.Traits{
+                                       Knative: &traitv1.KnativeTrait{
+                                               Trait: traitv1.Trait{
+                                                       Enabled: 
pointer.Bool(true),
+                                               },
+                                               FilterEventType: 
pointer.Bool(false),
+                                       },
+                               },
+                       },
+               },
+               IntegrationKit: &v1.IntegrationKit{
+                       Status: v1.IntegrationKitStatus{
+                               Phase: v1.IntegrationKitPhaseReady,
+                       },
+               },
+               Platform: &v1.IntegrationPlatform{
+                       Spec: v1.IntegrationPlatformSpec{
+                               Cluster: v1.IntegrationPlatformClusterOpenShift,
+                               Build: v1.IntegrationPlatformBuildSpec{
+                                       PublishStrategy: 
v1.IntegrationPlatformBuildPublishStrategyS2I,
+                                       Registry:        
v1.RegistrySpec{Address: "registry"},
+                                       RuntimeVersion:  
catalog.Runtime.Version,
+                               },
+                               Profile: v1.TraitProfileKnative,
+                       },
+                       Status: v1.IntegrationPlatformStatus{
+                               Phase: v1.IntegrationPlatformPhaseReady,
+                       },
+               },
+               EnvVars:        make([]corev1.EnvVar, 0),
+               ExecutedTraits: make([]Trait, 0),
+               Resources:      k8sutils.NewCollection(),
+       }
+       environment.Platform.ResyncStatusFullConfig()
+
+       // don't care about conditions in this unit test
+       _, err = traitCatalog.apply(&environment)
+
+       require.NoError(t, err)
+       assert.NotEmpty(t, environment.ExecutedTraits)
+       assert.NotNil(t, environment.GetTrait("knative"))
+
+       trigger := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "evt.type")
+       })
+
+       assert.NotNil(t, trigger)
+       assert.Equal(t, "default", trigger.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test-evttype", trigger.Name)
+
+       assert.Nil(t, trigger.Spec.Filter)
 }
 
-func TestKnativeTriggerConfigurationNoServingAvailable(t *testing.T) {
+func TestKnativeMultipleTrigger(t *testing.T) {
+       catalog, err := camel.DefaultCatalog()
+       require.NoError(t, err)
+
+       c, err := NewFakeClient("ns")
+       require.NoError(t, err)
+
+       traitCatalog := NewCatalog(c)
+
+       environment := Environment{
+               CamelCatalog: catalog,
+               Catalog:      traitCatalog,
+               Client:       c,
+               Integration: &v1.Integration{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name:      "test",
+                               Namespace: "ns",
+                       },
+                       Status: v1.IntegrationStatus{
+                               Phase: v1.IntegrationPhaseDeploying,
+                       },
+                       Spec: v1.IntegrationSpec{
+                               Profile: v1.TraitProfileKnative,
+                               Sources: []v1.SourceSpec{
+                                       {
+                                               DataSpec: v1.DataSpec{
+                                                       Name: "route.java",
+                                                       Content: `
+                                                               public class 
CartoonMessagesMover extends RouteBuilder {
+                                                                       public 
void configure() {
+                                                                               
from("knative:event/evt.type.1")
+                                                                               
        .log("${body}");
+
+                                                                               
from("knative:event/evt.type.2")
+                                                                               
        .log("${body}");
+
+                                                                               
from("knative:event")
+                                                                               
        .log("${body}");
+                                                                       }
+                                                               }
+                                                       `,
+                                               },
+                                               Language: v1.LanguageJavaSource,
+                                       },
+                               },
+                               Traits: v1.Traits{
+                                       Knative: &traitv1.KnativeTrait{
+                                               Trait: traitv1.Trait{
+                                                       Enabled: 
pointer.Bool(true),
+                                               },
+                                       },
+                               },
+                       },
+               },
+               IntegrationKit: &v1.IntegrationKit{
+                       Status: v1.IntegrationKitStatus{
+                               Phase: v1.IntegrationKitPhaseReady,
+                       },
+               },
+               Platform: &v1.IntegrationPlatform{
+                       Spec: v1.IntegrationPlatformSpec{
+                               Cluster: v1.IntegrationPlatformClusterOpenShift,
+                               Build: v1.IntegrationPlatformBuildSpec{
+                                       PublishStrategy: 
v1.IntegrationPlatformBuildPublishStrategyS2I,
+                                       Registry:        
v1.RegistrySpec{Address: "registry"},
+                                       RuntimeVersion:  
catalog.Runtime.Version,
+                               },
+                               Profile: v1.TraitProfileKnative,
+                       },
+                       Status: v1.IntegrationPlatformStatus{
+                               Phase: v1.IntegrationPlatformPhaseReady,
+                       },
+               },
+               EnvVars:        make([]corev1.EnvVar, 0),
+               ExecutedTraits: make([]Trait, 0),
+               Resources:      k8sutils.NewCollection(),
+       }
+       environment.Platform.ResyncStatusFullConfig()
+
+       // don't care about conditions in this unit test
+       _, err = traitCatalog.apply(&environment)
+
+       require.NoError(t, err)
+       assert.NotEmpty(t, environment.ExecutedTraits)
+       assert.NotNil(t, environment.GetTrait("knative"))
+
+       triggerNames := make([]string, 0)
+       environment.Resources.VisitKnativeTrigger(func(trigger 
*eventing.Trigger) {
+               triggerNames = append(triggerNames, trigger.Name)
+       })
+
+       assert.Len(t, triggerNames, 3)
+
+       trigger1 := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "evt.type.1")
+       })
+
+       assert.NotNil(t, trigger1)
+       assert.Equal(t, "default", trigger1.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger1.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger1.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/evt.type.1", trigger1.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test-evttype1", trigger1.Name)
+
+       assert.NotNil(t, trigger1.Spec.Filter)
+       assert.Len(t, trigger1.Spec.Filter.Attributes, 1)
+       assert.Equal(t, "evt.type.1", trigger1.Spec.Filter.Attributes["type"])
+
+       trigger2 := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "evt.type.2")
+       })
+
+       assert.NotNil(t, trigger2)
+       assert.Equal(t, "default", trigger2.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger2.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger2.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/evt.type.2", trigger2.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test-evttype2", trigger2.Name)
+
+       assert.NotNil(t, trigger2.Spec.Filter)
+       assert.Len(t, trigger2.Spec.Filter.Attributes, 1)
+       assert.Equal(t, "evt.type.2", trigger2.Spec.Filter.Attributes["type"])
+
+       trigger3 := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "")
+       })
+
+       assert.NotNil(t, trigger3)
+       assert.Equal(t, "default", trigger3.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger3.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger3.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/", trigger3.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test", trigger3.Name)
+
+       assert.Nil(t, trigger3.Spec.Filter)
+}
+
+func TestKnativeMultipleTriggerAdditionalFilterConfig(t *testing.T) {
+       catalog, err := camel.DefaultCatalog()
+       require.NoError(t, err)
+
+       c, err := NewFakeClient("ns")
+       require.NoError(t, err)
+
+       traitCatalog := NewCatalog(c)
+
+       environment := Environment{
+               CamelCatalog: catalog,
+               Catalog:      traitCatalog,
+               Client:       c,
+               Integration: &v1.Integration{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name:      "test",
+                               Namespace: "ns",
+                       },
+                       Status: v1.IntegrationStatus{
+                               Phase: v1.IntegrationPhaseDeploying,
+                       },
+                       Spec: v1.IntegrationSpec{
+                               Profile: v1.TraitProfileKnative,
+                               Sources: []v1.SourceSpec{
+                                       {
+                                               DataSpec: v1.DataSpec{
+                                                       Name: "route.java",
+                                                       Content: `
+                                                               public class 
CartoonMessagesMover extends RouteBuilder {
+                                                                       public 
void configure() {
+                                                                               
from("knative:event/evt.type.1")
+                                                                               
        .log("${body}");
+
+                                                                               
from("knative:event/evt.type.2")
+                                                                               
        .log("${body}");
+
+                                                                               
from("knative:event")
+                                                                               
        .log("${body}");
+                                                                       }
+                                                               }
+                                                       `,
+                                               },
+                                               Language: v1.LanguageJavaSource,
+                                       },
+                               },
+                               Traits: v1.Traits{
+                                       Knative: &traitv1.KnativeTrait{
+                                               Trait: traitv1.Trait{
+                                                       Enabled: 
pointer.Bool(true),
+                                               },
+                                               Filters: 
[]string{"subject=Hello"},
+                                       },
+                               },
+                       },
+               },
+               IntegrationKit: &v1.IntegrationKit{
+                       Status: v1.IntegrationKitStatus{
+                               Phase: v1.IntegrationKitPhaseReady,
+                       },
+               },
+               Platform: &v1.IntegrationPlatform{
+                       Spec: v1.IntegrationPlatformSpec{
+                               Cluster: v1.IntegrationPlatformClusterOpenShift,
+                               Build: v1.IntegrationPlatformBuildSpec{
+                                       PublishStrategy: 
v1.IntegrationPlatformBuildPublishStrategyS2I,
+                                       Registry:        
v1.RegistrySpec{Address: "registry"},
+                                       RuntimeVersion:  
catalog.Runtime.Version,
+                               },
+                               Profile: v1.TraitProfileKnative,
+                       },
+                       Status: v1.IntegrationPlatformStatus{
+                               Phase: v1.IntegrationPlatformPhaseReady,
+                       },
+               },
+               EnvVars:        make([]corev1.EnvVar, 0),
+               ExecutedTraits: make([]Trait, 0),
+               Resources:      k8sutils.NewCollection(),
+       }
+       environment.Platform.ResyncStatusFullConfig()
+
+       // don't care about conditions in this unit test
+       _, err = traitCatalog.apply(&environment)
+
+       require.NoError(t, err)
+       assert.NotEmpty(t, environment.ExecutedTraits)
+       assert.NotNil(t, environment.GetTrait("knative"))
+
+       triggerNames := make([]string, 0)
+       environment.Resources.VisitKnativeTrigger(func(trigger 
*eventing.Trigger) {
+               triggerNames = append(triggerNames, trigger.Name)
+       })
+
+       assert.Len(t, triggerNames, 3)
+
+       trigger1 := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "evt.type.1")
+       })
+
+       assert.NotNil(t, trigger1)
+       assert.Equal(t, "default", trigger1.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger1.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger1.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/evt.type.1", trigger1.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test-evttype1", trigger1.Name)
+
+       assert.NotNil(t, trigger1.Spec.Filter)
+       assert.Len(t, trigger1.Spec.Filter.Attributes, 2)
+       assert.Equal(t, "evt.type.1", trigger1.Spec.Filter.Attributes["type"])
+       assert.Equal(t, "Hello", trigger1.Spec.Filter.Attributes["subject"])
+
+       trigger2 := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "evt.type.2")
+       })
+
+       assert.NotNil(t, trigger2)
+       assert.Equal(t, "default", trigger2.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger2.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger2.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/evt.type.2", trigger2.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test-evttype2", trigger2.Name)
+
+       assert.NotNil(t, trigger2.Spec.Filter)
+       assert.Len(t, trigger2.Spec.Filter.Attributes, 2)
+       assert.Equal(t, "evt.type.2", trigger2.Spec.Filter.Attributes["type"])
+       assert.Equal(t, "Hello", trigger2.Spec.Filter.Attributes["subject"])
+
+       trigger3 := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "")
+       })
+
+       assert.NotNil(t, trigger3)
+       assert.Equal(t, "default", trigger3.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger3.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger3.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/", trigger3.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test", trigger3.Name)
+
+       assert.NotNil(t, trigger3.Spec.Filter)
+       assert.Len(t, trigger3.Spec.Filter.Attributes, 1)
+       assert.Equal(t, "Hello", trigger3.Spec.Filter.Attributes["subject"])
+}
+
+func TestKnativeTriggerNoEventType(t *testing.T) {
+       catalog, err := camel.DefaultCatalog()
+       require.NoError(t, err)
+
+       c, err := NewFakeClient("ns")
+       require.NoError(t, err)
+
+       traitCatalog := NewCatalog(c)
+
+       environment := Environment{
+               CamelCatalog: catalog,
+               Catalog:      traitCatalog,
+               Client:       c,
+               Integration: &v1.Integration{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name:      "test",
+                               Namespace: "ns",
+                       },
+                       Status: v1.IntegrationStatus{
+                               Phase: v1.IntegrationPhaseDeploying,
+                       },
+                       Spec: v1.IntegrationSpec{
+                               Profile: v1.TraitProfileKnative,
+                               Sources: []v1.SourceSpec{
+                                       {
+                                               DataSpec: v1.DataSpec{
+                                                       Name: "route.java",
+                                                       Content: `
+                                                               public class 
CartoonMessagesMover extends RouteBuilder {
+                                                                       public 
void configure() {
+                                                                               
from("knative:event")
+                                                                               
        .log("${body}");
+                                                                       }
+                                                               }
+                                                       `,
+                                               },
+                                               Language: v1.LanguageJavaSource,
+                                       },
+                               },
+                               Traits: v1.Traits{
+                                       Knative: &traitv1.KnativeTrait{
+                                               Trait: traitv1.Trait{
+                                                       Enabled: 
pointer.Bool(true),
+                                               },
+                                       },
+                               },
+                       },
+               },
+               IntegrationKit: &v1.IntegrationKit{
+                       Status: v1.IntegrationKitStatus{
+                               Phase: v1.IntegrationKitPhaseReady,
+                       },
+               },
+               Platform: &v1.IntegrationPlatform{
+                       Spec: v1.IntegrationPlatformSpec{
+                               Cluster: v1.IntegrationPlatformClusterOpenShift,
+                               Build: v1.IntegrationPlatformBuildSpec{
+                                       PublishStrategy: 
v1.IntegrationPlatformBuildPublishStrategyS2I,
+                                       Registry:        
v1.RegistrySpec{Address: "registry"},
+                                       RuntimeVersion:  
catalog.Runtime.Version,
+                               },
+                               Profile: v1.TraitProfileKnative,
+                       },
+                       Status: v1.IntegrationPlatformStatus{
+                               Phase: v1.IntegrationPlatformPhaseReady,
+                       },
+               },
+               EnvVars:        make([]corev1.EnvVar, 0),
+               ExecutedTraits: make([]Trait, 0),
+               Resources:      k8sutils.NewCollection(),
+       }
+       environment.Platform.ResyncStatusFullConfig()
+
+       // don't care about conditions in this unit test
+       _, err = traitCatalog.apply(&environment)
+
+       require.NoError(t, err)
+       assert.NotEmpty(t, environment.ExecutedTraits)
+       assert.NotNil(t, environment.GetTrait("knative"))
+
+       trigger := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "")
+       })
+
+       assert.NotNil(t, trigger)
+       assert.Equal(t, "default", trigger.Spec.Broker)
+       assert.Equal(t, serving.SchemeGroupVersion.String(), 
trigger.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/", trigger.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test", trigger.Name)
+
+       assert.Nil(t, trigger.Spec.Filter)
+}
+
+func TestKnativeTriggerNoServingAvailable(t *testing.T) {
        catalog, err := camel.DefaultCatalog()
        require.NoError(t, err)
 
@@ -448,17 +1118,21 @@ func TestKnativeTriggerConfigurationNoServingAvailable(t 
*testing.T) {
        assert.NotEmpty(t, environment.ExecutedTraits)
        assert.NotNil(t, environment.GetTrait("knative"))
 
-       assert.NotNil(t, environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
-               matching := true
+       trigger := environment.Resources.GetKnativeTrigger(func(trigger 
*eventing.Trigger) bool {
+               return trigger.Name == knative.GetTriggerName("default", 
"test", "evt.type")
+       })
 
-               matching = matching && assert.Equal(t, "default", 
trigger.Spec.Broker)
-               matching = matching && assert.Equal(t, "v1", 
trigger.Spec.Subscriber.Ref.APIVersion)
-               matching = matching && assert.Equal(t, "Service", 
trigger.Spec.Subscriber.Ref.Kind)
-               matching = matching && assert.Equal(t, "/events/evt.type", 
trigger.Spec.Subscriber.URI.Path)
-               matching = matching && assert.Equal(t, "default-test-evttype", 
trigger.Name)
+       assert.NotNil(t, trigger)
 
-               return matching
-       }))
+       assert.Equal(t, "default", trigger.Spec.Broker)
+       assert.Equal(t, "v1", trigger.Spec.Subscriber.Ref.APIVersion)
+       assert.Equal(t, "Service", trigger.Spec.Subscriber.Ref.Kind)
+       assert.Equal(t, "/events/evt.type", trigger.Spec.Subscriber.URI.Path)
+       assert.Equal(t, "default-test-evttype", trigger.Name)
+
+       assert.NotNil(t, trigger.Spec.Filter)
+       assert.Len(t, trigger.Spec.Filter.Attributes, 1)
+       assert.Equal(t, "evt.type", trigger.Spec.Filter.Attributes["type"])
 }
 
 func TestKnativePlatformHttpConfig(t *testing.T) {
diff --git a/pkg/util/bindings/bindings_test.go 
b/pkg/util/bindings/bindings_test.go
index 7cfa78c86..0cd7e3516 100644
--- a/pkg/util/bindings/bindings_test.go
+++ b/pkg/util/bindings/bindings_test.go
@@ -125,6 +125,12 @@ func TestBindings(t *testing.T) {
                                }),
                        },
                        uri: 
"knative:event/myeventtype?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default",
+                       traits: v1.Traits{
+                               Knative: &traitv1.KnativeTrait{
+                                       Filters:         
[]string{"type=myeventtype"},
+                                       FilterEventType: pointer.Bool(true),
+                               },
+                       },
                },
                {
                        endpointType: v1.EndpointTypeSource,
diff --git a/pkg/util/bindings/knative_ref.go b/pkg/util/bindings/knative_ref.go
index 301e62099..cfd48fa0c 100644
--- a/pkg/util/bindings/knative_ref.go
+++ b/pkg/util/bindings/knative_ref.go
@@ -24,7 +24,9 @@ import (
 
        v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
        knativeapis "github.com/apache/camel-k/v2/pkg/apis/camel/v1/knative"
-       v1alpha1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+       "github.com/apache/camel-k/v2/pkg/apis/camel/v1/trait"
+       "github.com/apache/camel-k/v2/pkg/apis/camel/v1alpha1"
+       "github.com/apache/camel-k/v2/pkg/util/property"
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/runtime/schema"
 
@@ -42,8 +44,6 @@ func (k KnativeRefBindingProvider) ID() string {
 }
 
 // Translate --.
-//
-//nolint:dupl
 func (k KnativeRefBindingProvider) Translate(ctx BindingContext, endpointCtx 
EndpointContext, e v1.Endpoint) (*Binding, error) {
        if e.Ref == nil {
                // works only on refs
@@ -78,13 +78,9 @@ func (k KnativeRefBindingProvider) Translate(ctx 
BindingContext, endpointCtx End
        if props == nil {
                props = make(map[string]string)
        }
-       if props["apiVersion"] == "" {
-               props["apiVersion"] = e.Ref.APIVersion
-       }
-       if props["kind"] == "" {
-               props["kind"] = e.Ref.Kind
-       }
 
+       var filterEventType = true
+       var filterExpressions = make([]string, 0)
        var serviceURI string
 
        // TODO: refactor
@@ -93,26 +89,63 @@ func (k KnativeRefBindingProvider) Translate(ctx 
BindingContext, endpointCtx End
                if props["name"] == "" {
                        props["name"] = e.Ref.Name
                }
+
+               if endpointCtx.Type == v1.EndpointTypeSource {
+                       // Configure trigger filter attributes for the Knative 
event source
+                       for key, value := range props {
+                               if key == "cloudEventsType" {
+                                       // cloudEventsType is a synonym for 
type filter attribute
+                                       filterExpressions = 
append(filterExpressions, fmt.Sprintf("type=%s", value))
+                               } else if key != "name" {
+                                       filterExpressions = 
append(filterExpressions, fmt.Sprintf("%s=%s", key, value))
+                               }
+                       }
+               }
+
                if eventType, ok := props["type"]; ok {
-                       // consume prop
+                       // consume the type property and set it as URI path 
parameter
                        delete(props, "type")
                        serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, 
eventType)
+               } else if cloudEventsType, found := props["cloudEventsType"]; 
found && endpointCtx.Type == v1.EndpointTypeSource {
+                       // set the cloud events type as URI path parameter, but 
keep it also as URI query param
+                       serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, 
cloudEventsType)
                } else {
-                       if endpointCtx.Type == v1.EndpointTypeSink || 
endpointCtx.Type == v1.EndpointTypeAction {
-                               // Allowing no event type, but it can fail. See 
https://github.com/apache/camel-k/v2-runtime/issues/536
-                               serviceURI = fmt.Sprintf("knative:%s", 
*serviceType)
-                       } else {
-                               return nil, errors.New(`property "type" must be 
provided when reading from the Broker`)
-                       }
+                       filterEventType = false
+                       serviceURI = fmt.Sprintf("knative:%s", *serviceType)
                }
        } else {
                serviceURI = fmt.Sprintf("knative:%s/%s", *serviceType, 
url.PathEscape(e.Ref.Name))
        }
 
+       // Remove filter attributes from props to avoid adding them to the 
service URI query params
+       for _, exp := range filterExpressions {
+               key, _ := property.SplitPropertyFileEntry(exp)
+               delete(props, key)
+       }
+
+       // Enrich service URI query params if not set
+       if props["apiVersion"] == "" {
+               props["apiVersion"] = e.Ref.APIVersion
+       }
+       if props["kind"] == "" {
+               props["kind"] = e.Ref.Kind
+       }
+
        serviceURI = uri.AppendParameters(serviceURI, props)
-       return &Binding{
+       var binding = Binding{
                URI: serviceURI,
-       }, nil
+       }
+
+       if endpointCtx.Type == v1.EndpointTypeSource && (len(filterExpressions) 
> 0 || !filterEventType) {
+               binding.Traits = v1.Traits{
+                       Knative: &trait.KnativeTrait{
+                               Filters:         filterExpressions,
+                               FilterEventType: &filterEventType,
+                       },
+               }
+       }
+
+       return &binding, nil
 }
 
 func isKnownKnativeResource(ref *corev1.ObjectReference) (bool, error) {
@@ -149,8 +182,6 @@ func (k V1alpha1KnativeRefBindingProvider) ID() string {
 
 // Translate --.
 // Deprecated.
-//
-//nolint:dupl
 func (k V1alpha1KnativeRefBindingProvider) Translate(ctx 
V1alpha1BindingContext, endpointCtx V1alpha1EndpointContext, e 
v1alpha1.Endpoint) (*Binding, error) {
        if e.Ref == nil {
                // works only on refs
diff --git a/pkg/util/bindings/knative_ref_test.go 
b/pkg/util/bindings/knative_ref_test.go
index 6ddadad49..18e482090 100644
--- a/pkg/util/bindings/knative_ref_test.go
+++ b/pkg/util/bindings/knative_ref_test.go
@@ -22,20 +22,170 @@ import (
        "fmt"
        "testing"
 
-       "github.com/apache/camel-k/v2/pkg/util/test"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
        v1 "k8s.io/api/core/v1"
+       "k8s.io/utils/pointer"
 
        camelv1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
+       "github.com/apache/camel-k/v2/pkg/util/test"
 )
 
-func TestKnativeRefBinding(t *testing.T) {
+func TestKnativeRefAsSource(t *testing.T) {
        testcases := []struct {
+               name            string
+               endpoint        camelv1.Endpoint
+               uri             string
+               filters         []string
+               filterEventType *bool
+       }{
+               {
+                       name: "broker",
+                       endpoint: camelv1.Endpoint{
+                               Ref: &v1.ObjectReference{
+                                       Kind:       "Broker",
+                                       Name:       "default",
+                                       APIVersion: "eventing.knative.dev/v1",
+                               },
+                       },
+                       uri:             
"knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default",
+                       filterEventType: pointer.Bool(false),
+               },
+               {
+                       name: "broker-name-property",
+                       endpoint: camelv1.Endpoint{
+                               Ref: &v1.ObjectReference{
+                                       Kind:       "Broker",
+                                       Name:       "default",
+                                       APIVersion: "eventing.knative.dev/v1",
+                               },
+                               Properties: 
asEndpointProperties(map[string]string{"name": "my-broker"}),
+                       },
+                       uri:             
"knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=my-broker",
+                       filterEventType: pointer.Bool(false),
+               },
+               {
+                       name: "event-type-filter",
+                       endpoint: camelv1.Endpoint{
+                               Ref: &v1.ObjectReference{
+                                       Kind:       "Broker",
+                                       Name:       "default",
+                                       APIVersion: "eventing.knative.dev/v1",
+                               },
+                               Properties: 
asEndpointProperties(map[string]string{"type": "org.apache.camel.myevent"}),
+                       },
+                       uri:     
"knative:event/org.apache.camel.myevent?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default",
+                       filters: []string{"type=org.apache.camel.myevent"},
+               },
+               {
+                       name: "cloud-events-type-filter",
+                       endpoint: camelv1.Endpoint{
+                               Ref: &v1.ObjectReference{
+                                       Kind:       "Broker",
+                                       Name:       "default",
+                                       APIVersion: "eventing.knative.dev/v1",
+                               },
+                               Properties: 
asEndpointProperties(map[string]string{"cloudEventsType": 
"org.apache.camel.cloudevent"}),
+                       },
+                       uri:     
"knative:event/org.apache.camel.cloudevent?apiVersion=eventing.knative.dev%2Fv1&cloudEventsType=org.apache.camel.cloudevent&kind=Broker&name=default",
+                       filters: []string{"type=org.apache.camel.cloudevent"},
+               },
+               {
+                       name: "event-filters",
+                       endpoint: camelv1.Endpoint{
+                               Ref: &v1.ObjectReference{
+                                       Kind:       "Broker",
+                                       Name:       "default",
+                                       APIVersion: "eventing.knative.dev/v1",
+                               },
+                               Properties: 
asEndpointProperties(map[string]string{"source": "my-source", "subject": 
"mySubject"}),
+                       },
+                       uri:             
"knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default",
+                       filters:         []string{"source=my-source", 
"subject=mySubject"},
+                       filterEventType: pointer.Bool(false),
+               },
+               {
+                       name: "event-extension-filters",
+                       endpoint: camelv1.Endpoint{
+                               Ref: &v1.ObjectReference{
+                                       Kind:       "Broker",
+                                       Name:       "default",
+                                       APIVersion: "eventing.knative.dev/v1",
+                               },
+                               Properties: 
asEndpointProperties(map[string]string{"myextension": "foo"}),
+                       },
+                       uri:             
"knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default",
+                       filters:         []string{"myextension=foo"},
+                       filterEventType: pointer.Bool(false),
+               },
+               {
+                       name: "channel",
+                       endpoint: camelv1.Endpoint{
+                               Ref: &v1.ObjectReference{
+                                       Kind:       "Channel",
+                                       Name:       "mychannel",
+                                       APIVersion: "messaging.knative.dev/v1",
+                               },
+                       },
+                       uri: 
"knative:channel/mychannel?apiVersion=messaging.knative.dev%2Fv1&kind=Channel",
+               },
+               {
+                       name: "service",
+                       endpoint: camelv1.Endpoint{
+                               Ref: &v1.ObjectReference{
+                                       Kind:       "Service",
+                                       Name:       "myservice",
+                                       APIVersion: "serving.knative.dev/v1",
+                               },
+                       },
+                       uri: 
"knative:endpoint/myservice?apiVersion=serving.knative.dev%2Fv1&kind=Service",
+               },
+       }
+
+       for i, tc := range testcases {
+               t.Run(fmt.Sprintf("test-%d-%s", i, tc.name), func(t *testing.T) 
{
+                       ctx, cancel := context.WithCancel(context.Background())
+                       defer cancel()
+
+                       client, err := test.NewFakeClient()
+                       require.NoError(t, err)
+
+                       bindingContext := BindingContext{
+                               Ctx:       ctx,
+                               Client:    client,
+                               Namespace: "test",
+                               Profile:   camelv1.TraitProfileKnative,
+                       }
+
+                       binding, err := 
KnativeRefBindingProvider{}.Translate(bindingContext, EndpointContext{
+                               Type: camelv1.EndpointTypeSource,
+                       }, tc.endpoint)
+                       require.NoError(t, err)
+                       assert.NotNil(t, binding)
+                       assert.Equal(t, tc.uri, binding.URI)
+
+                       if tc.filters != nil || 
!pointer.BoolDeref(tc.filterEventType, true) {
+                               assert.NotNil(t, binding.Traits.Knative)
+                               assert.Len(t, binding.Traits.Knative.Filters, 
len(tc.filters))
+
+                               for _, filterExpression := range tc.filters {
+                                       assert.Contains(t, 
binding.Traits.Knative.Filters, filterExpression)
+                               }
+
+                               assert.Equal(t, 
pointer.BoolDeref(binding.Traits.Knative.FilterEventType, true), 
pointer.BoolDeref(tc.filterEventType, true))
+                       }
+               })
+       }
+}
+
+func TestKnativeRefAsSink(t *testing.T) {
+       testcases := []struct {
+               name     string
                endpoint camelv1.Endpoint
                uri      string
        }{
                {
+                       name: "broker",
                        endpoint: camelv1.Endpoint{
                                Ref: &v1.ObjectReference{
                                        Kind:       "Broker",
@@ -46,6 +196,31 @@ func TestKnativeRefBinding(t *testing.T) {
                        uri: 
"knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default",
                },
                {
+                       name: "broker-name-property",
+                       endpoint: camelv1.Endpoint{
+                               Ref: &v1.ObjectReference{
+                                       Kind:       "Broker",
+                                       Name:       "default",
+                                       APIVersion: "eventing.knative.dev/v1",
+                               },
+                               Properties: 
asEndpointProperties(map[string]string{"name": "my-broker"}),
+                       },
+                       uri: 
"knative:event?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=my-broker",
+               },
+               {
+                       name: "event-type",
+                       endpoint: camelv1.Endpoint{
+                               Ref: &v1.ObjectReference{
+                                       Kind:       "Broker",
+                                       Name:       "default",
+                                       APIVersion: "eventing.knative.dev/v1",
+                               },
+                               Properties: 
asEndpointProperties(map[string]string{"type": "org.apache.camel.myevent"}),
+                       },
+                       uri: 
"knative:event/org.apache.camel.myevent?apiVersion=eventing.knative.dev%2Fv1&kind=Broker&name=default",
+               },
+               {
+                       name: "channel",
                        endpoint: camelv1.Endpoint{
                                Ref: &v1.ObjectReference{
                                        Kind:       "Channel",
@@ -56,6 +231,7 @@ func TestKnativeRefBinding(t *testing.T) {
                        uri: 
"knative:channel/mychannel?apiVersion=messaging.knative.dev%2Fv1&kind=Channel",
                },
                {
+                       name: "service",
                        endpoint: camelv1.Endpoint{
                                Ref: &v1.ObjectReference{
                                        Kind:       "Service",
@@ -68,7 +244,7 @@ func TestKnativeRefBinding(t *testing.T) {
        }
 
        for i, tc := range testcases {
-               t.Run(fmt.Sprintf("test-%d-%s", i, tc.endpoint.Ref.Kind), 
func(t *testing.T) {
+               t.Run(fmt.Sprintf("test-%d-%s", i, tc.name), func(t *testing.T) 
{
                        ctx, cancel := context.WithCancel(context.Background())
                        defer cancel()
 
diff --git a/pkg/util/knative/knative.go b/pkg/util/knative/knative.go
index b72ae1024..e6623ea1f 100644
--- a/pkg/util/knative/knative.go
+++ b/pkg/util/knative/knative.go
@@ -76,47 +76,36 @@ func CreateSubscription(channelReference 
corev1.ObjectReference, serviceName str
 }
 
 // CreateServiceTrigger create Knative trigger with arbitrary Kubernetes 
Service as a subscriber - usually used when no Knative Serving is available on 
the cluster.
-func CreateServiceTrigger(brokerReference corev1.ObjectReference, serviceName 
string, eventType string, path string) (*eventing.Trigger, error) {
+func CreateServiceTrigger(brokerReference corev1.ObjectReference, serviceName 
string, eventType string, path string, attributes map[string]string) 
(*eventing.Trigger, error) {
        subscriberRef := duckv1.KReference{
                APIVersion: "v1",
                Kind:       "Service",
                Name:       serviceName,
        }
-       return CreateTrigger(brokerReference, subscriberRef, eventType, path)
+       return CreateTrigger(brokerReference, subscriberRef, eventType, path, 
attributes)
 }
 
 // CreateKnativeServiceTrigger create Knative trigger with Knative Serving 
Service as a subscriber - default option when Knative Serving is available on 
the cluster.
-func CreateKnativeServiceTrigger(brokerReference corev1.ObjectReference, 
serviceName string, eventType string, path string) (*eventing.Trigger, error) {
+func CreateKnativeServiceTrigger(brokerReference corev1.ObjectReference, 
serviceName string, eventType string, path string, attributes 
map[string]string) (*eventing.Trigger, error) {
        subscriberRef := duckv1.KReference{
                APIVersion: serving.SchemeGroupVersion.String(),
                Kind:       "Service",
                Name:       serviceName,
        }
-       return CreateTrigger(brokerReference, subscriberRef, eventType, path)
+       return CreateTrigger(brokerReference, subscriberRef, eventType, path, 
attributes)
 }
 
-func CreateTrigger(brokerReference corev1.ObjectReference, subscriberRef 
duckv1.KReference, eventType string, path string) (*eventing.Trigger, error) {
-       nameSuffix := ""
-       var attributes map[string]string
-       if eventType != "" {
-               nameSuffix = fmt.Sprintf("-%s", util.SanitizeLabel(eventType))
-               attributes = map[string]string{
-                       "type": eventType,
-               }
-       }
-       return &eventing.Trigger{
+func CreateTrigger(brokerReference corev1.ObjectReference, subscriberRef 
duckv1.KReference, eventType string, path string, attributes map[string]string) 
(*eventing.Trigger, error) {
+       trigger := eventing.Trigger{
                TypeMeta: metav1.TypeMeta{
                        APIVersion: eventing.SchemeGroupVersion.String(),
                        Kind:       "Trigger",
                },
                ObjectMeta: metav1.ObjectMeta{
                        Namespace: brokerReference.Namespace,
-                       Name:      brokerReference.Name + "-" + 
subscriberRef.Name + nameSuffix,
+                       Name:      GetTriggerName(brokerReference.Name, 
subscriberRef.Name, eventType),
                },
                Spec: eventing.TriggerSpec{
-                       Filter: &eventing.TriggerFilter{
-                               Attributes: attributes,
-                       },
                        Broker: brokerReference.Name,
                        Subscriber: duckv1.Destination{
                                Ref: &subscriberRef,
@@ -125,7 +114,24 @@ func CreateTrigger(brokerReference corev1.ObjectReference, 
subscriberRef duckv1.
                                },
                        },
                },
-       }, nil
+       }
+
+       if len(attributes) > 0 {
+               trigger.Spec.Filter = &eventing.TriggerFilter{
+                       Attributes: attributes,
+               }
+       }
+
+       return &trigger, nil
+}
+
+func GetTriggerName(brokerName string, subscriberName string, eventType 
string) string {
+       nameSuffix := ""
+       if eventType != "" {
+               nameSuffix = fmt.Sprintf("-%s", util.SanitizeLabel(eventType))
+       }
+
+       return brokerName + "-" + subscriberName + nameSuffix
 }
 
 func CreateSinkBinding(source corev1.ObjectReference, target 
corev1.ObjectReference) *sources.SinkBinding {
@@ -203,7 +209,7 @@ func getSinkURI(ctx context.Context, c client.Client, sink 
*corev1.ObjectReferen
        }
 
        objIdentifier := fmt.Sprintf("\"%s/%s\" (%s)", u.GetNamespace(), 
u.GetName(), u.GroupVersionKind())
-       // Special case v1/Service to allow it be addressable
+       // Special case v1/Service allowing it to be addressable
        if u.GroupVersionKind().Kind == "Service" && u.GroupVersionKind().Group 
== "" && u.GroupVersionKind().Version == "v1" {
                return fmt.Sprintf("http://%s.%s.svc/";, u.GetName(), 
u.GetNamespace()), nil
        }
diff --git a/pkg/util/knative/uri.go b/pkg/util/knative/uri.go
index 73f4fb2b2..f128d5e54 100644
--- a/pkg/util/knative/uri.go
+++ b/pkg/util/knative/uri.go
@@ -84,7 +84,7 @@ func ExtractObjectReference(uri string) (v1.ObjectReference, 
error) {
        }, nil
 }
 
-// ExtractEventType extract the eventType from a event URI.
+// ExtractEventType extract the eventType from an event URI.
 func ExtractEventType(uri string) string {
        return matchOrEmpty(uriRegexp, 2, uri)
 }
diff --git a/script/Makefile b/script/Makefile
index 215df7dd3..d8290f535 100644
--- a/script/Makefile
+++ b/script/Makefile
@@ -340,7 +340,7 @@ test-install-upgrade: do-build
        go test -timeout 30m -v ./e2e/install/upgrade -tags=integration 
$(TEST_INSTALL_RUN) $(GOTESTFMT)
 
 #
-# Knative tests that require the presence of KNative configuration
+# Knative tests that require the presence of Knative configuration
 #
 test-knative: do-build
        STAGING_RUNTIME_REPO="$(STAGING_RUNTIME_REPO)"; \

Reply via email to