This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-k.git
The following commit(s) were added to refs/heads/main by this push: new 3460abd56 chore(doc): kamelets reorganization 3460abd56 is described below commit 3460abd56020bd0da402160d46fe9c753b8087d2 Author: Pasquale Congiusti <pasquale.congiu...@gmail.com> AuthorDate: Thu Sep 19 10:55:49 2024 +0200 chore(doc): kamelets reorganization --- docs/modules/ROOT/nav.adoc | 6 +- docs/modules/ROOT/pages/kamelets/kamelets-dev.adoc | 1747 -------------------- .../modules/ROOT/pages/kamelets/kamelets-user.adoc | 721 +------- docs/modules/ROOT/pages/kamelets/kamelets.adoc | 4 +- docs/modules/ROOT/pages/kamelets/keda.adoc | 109 ++ .../error-handler.adoc} | 15 +- docs/modules/ROOT/pages/pipes/pipes.adoc | 375 ++++- 7 files changed, 497 insertions(+), 2480 deletions(-) diff --git a/docs/modules/ROOT/nav.adoc b/docs/modules/ROOT/nav.adoc index 6b2093895..e54685ca8 100644 --- a/docs/modules/ROOT/nav.adoc +++ b/docs/modules/ROOT/nav.adoc @@ -22,6 +22,7 @@ ** xref:running/promoting.adoc[Promote an Integration] * xref:pipes/pipes.adoc[Run an Pipe] ** xref:pipes/bind-cli.adoc[kamel bind CLI] +** xref:pipes/error-handler.adoc[Error Handler] ** xref:pipes/promoting.adoc[Promote a Pipe] * xref:languages/languages.adoc[Languages] ** xref:languages/java.adoc[Java] @@ -84,9 +85,8 @@ // End of autogenerated code - DO NOT EDIT! (trait-nav) * xref:kamelets/kamelets.adoc[Kamelets] ** xref:kamelets/kamelets-distribution.adoc[Distribution] -** xref:kamelets/kamelets-user.adoc[User Guide] -** xref:kamelets/kamelets-dev.adoc[Developer Guide] -** xref:kamelets/kameletbindings-error-handler.adoc[Error Handling] +** xref:kamelets/kamelets-user.adoc[Configuration] +** xref:kamelets/keda.adoc[KEDA] * xref:pipeline/pipeline.adoc[Pipelines] ** xref:pipeline/external.adoc[External CICD] * Scaling diff --git a/docs/modules/ROOT/pages/kamelets/kamelets-dev.adoc b/docs/modules/ROOT/pages/kamelets/kamelets-dev.adoc deleted file mode 100644 index c617c4780..000000000 --- a/docs/modules/ROOT/pages/kamelets/kamelets-dev.adoc +++ /dev/null @@ -1,1747 +0,0 @@ -[[kamelets-developer-guide]] -= Kamelets Developer Guide - -[[kamelets-dev-introduction]] -== Introduction - -This document guides you through the process of developing a new Kamelet that can be used by any Apache Camel subproject supporting the -Kamelet technology stack and shared with others via Kamelet catalogs, such as the official the Apache Camel xref:camel-kamelets::index.adoc[Kamelet Catalog]. - -We assume that the reader is familiar with the content of the xref:kamelets/kamelets-user.adoc[Kamelets User Guide] and with -Camel K xref:installation/installation.adoc[installation] and general usage. - -== Basics - -If you started to learn a bit about Kamelets, you've seen that they can be used to create two kinds of connectors: - -- *Sources*: they produce data that can be injected into a destination -- *Sinks*: they consume data and optionally produce a response - -When creating a new Kamelet, you should first decide first which kind of Kamelet you're going to create, which depends on the use case you've in mind. -A Kamelet does a **single thing**, so if you want to provide both a source and a sink for your system, they are two Kamelets. - -In its essence, a Kamelet consists of a *single YAML file* that contains information on two distinct aspects of the connector: - -- *User view*: this part contains general documentation about the Kamelet, covering also the parameters that need to be configured in order to use it -- *Runtime aspects*: this part tells the Camel runtime how to implement what the Kamelet promises to do. Most of the times it contains a Camel route template in YAML DSL - -NOTE: We're ignoring here the part around data types of a Kamelet, which is not fundamental for the Kamelet to work and it is still subject to change - -We'll guide you through the process of creating a simple Kamelet by remapping a Camel component, then we'll go through a much more complicated real-world example. - -== Creating a simple Kamelet - -Since Apache Camel provides more than 300 components out of the box, it's easy to create a Kamelet starting from one of the components already available. -Most of the Kamelets available in the official catalog, in fact, are simple ones that contain only a remapping of the Kamelet properties into Camel endpoint parameters. -We're going to show an example shortly. - -Suppose that you want to provide a Kamelet that allows users to search data on Twitter, providing a stream of information about a given keyword. -Creating such a Kamelet is a fairly easy task: we can use options of the "camel-twitter" component without adding much processing logic. - -So the procedure of writing a simple Kamelet starts with scaffolding a new Kamelet resource, which can be done with the Camel JBang CLI (`camel`): - -[source] ----- -camel init twitter-search-source.kamelet.yaml ----- - -This produces a YAML file like the following one: - -.twitter-search-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: twitter-search-source - labels: - camel.apache.org/kamelet.type: "source" -spec: - definition: - title: "Timer" - description: "Produces periodic events with a custom payload" - required: - - message - properties: - period: - title: Period - description: The time interval between two events - type: integer - default: 1000 - message: - title: Message - description: The message to generate - type: string - dataTypes: - out: - default: text - types: - text: - mediaType: text/plain - template: - from: - uri: timer:tick - parameters: - period: "{{period}}" - steps: - - setBody: - constant: "{{message}}" - - to: "kamelet:sink" ----- - -We need to change the file to do what we want to achieve, that is, creating a route that searches a given keyword on Twitter. - -The route provided in the initial scaffold (timer-to-log) is not what we need, so we change it to the following: - -.twitter-search-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -# ... -spec: -# ... - template: - from: - uri: "twitter-search:{{keywords}}" # <1> - parameters: - accessToken: "{{accessToken}}" # <2> - accessTokenSecret: "{{accessTokenSecret}}" - consumerKey: "{{apiKey}}" # <3> - consumerSecret: "{{apiKeySecret}}" - steps: - - marshal: # <4> - json: {} - - to: "kamelet:sink" # <5> ----- -<1> `keywords` is a path parameter in xref:components::twitter-search-component.adoc[Camel Twitter-search] -<2> Some endpoint parameters are just mapped 1-1 -<3> The Camel component `consumerKey` is named `apiKey` to reflect the actual name in the Twitter developer portal -<4> The Camel Twitter component generates Java objects, so we marshal them to JSON -<5> A Source Kamelet sends data to the special endpoint "kamelet:sink", that will be replaced at runtime by a different target - -The YAML route template above just uses the `twitter-search` component to do searches on Twitter. We added a marshalling step to JSON -because the output of a Kamelet needs always to be something that can be transferred over the wire. - -The Kamelet is almost complete, we just need to document the parameters in a JSON schema format. -We specify it in the `spec` -> `definition` part: - -.twitter-search-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: twitter-search-source -# ... -spec: - definition: - title: "Twitter Search Source" # <1> - description: |- - Allows to get all tweets on particular keywords from Twitter. - - It requires tokens that can be obtained by creating an application - in the Twitter developer portal: https://developer.twitter.com/. - required: # <2> - - keywords - - apiKey - - apiKeySecret - - accessToken - - accessTokenSecret - properties: - keywords: # <3> - title: Keywords - description: The keywords to use in the Twitter search (Supports Twitter standard operators) - type: string - example: "Apache Camel" - apiKey: - title: API Key - description: The API Key from the Twitter application in the developer portal - type: string - format: password - x-descriptors: - - urn:alm:descriptor:com.tectonic.ui:password # <4> - apiKeySecret: - title: API Key Secret - description: The API Key Secret from the Twitter application in the developer portal - type: string - format: password - x-descriptors: - - urn:alm:descriptor:com.tectonic.ui:password - accessToken: - title: Access Token - description: The Access Token from the Twitter application in the developer portal - type: string - format: password - x-descriptors: - - urn:alm:descriptor:com.tectonic.ui:password - accessTokenSecret: - title: Access Token Secret - description: The Access Token Secret from the Twitter application in the developer portal - type: string - format: password - x-descriptors: - - urn:alm:descriptor:com.tectonic.ui:password -# ... ----- -<1> General information about the Kamelet itself in textual format -<2> List of required parameters -<3> A specification for each one of the parameters (flat structure, no nested options allowed) -<4> Optional graphical customization for a specific UI (OpenShift Console) - -This is all you need to create a Kamelet so that other users can leverage it. There are a few things remaining, like setting information about -the generated objects and other metadata (like the icon and the provider and you're done). The final Kamelet can look like the following: - -.twitter-search-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: twitter-search-source - annotations: - camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,..." # Truncated <1> - camel.apache.org/provider: "Apache Software Foundation" - labels: - camel.apache.org/kamelet.type: "source" - camel.apache.org/kamelet.group: "Twitter" -spec: - definition: - title: "Twitter Search Source" - description: |- - Allows to get all tweets on particular keywords from Twitter. - - It requires tokens that can be obtained by creating an application - in the Twitter developer portal: https://developer.twitter.com/. - required: - - keywords - - apiKey - - apiKeySecret - - accessToken - - accessTokenSecret - properties: - keywords: - title: Keywords - description: The keywords to use in the Twitter search (Supports Twitter standard operators) - type: string - example: "Apache Camel" - apiKey: - title: API Key - description: The API Key from the Twitter application in the developer portal - type: string - format: password - x-descriptors: - - urn:alm:descriptor:com.tectonic.ui:password - apiKeySecret: - title: API Key Secret - description: The API Key Secret from the Twitter application in the developer portal - type: string - format: password - x-descriptors: - - urn:alm:descriptor:com.tectonic.ui:password - accessToken: - title: Access Token - description: The Access Token from the Twitter application in the developer portal - type: string - format: password - x-descriptors: - - urn:alm:descriptor:com.tectonic.ui:password - accessTokenSecret: - title: Access Token Secret - description: The Access Token Secret from the Twitter application in the developer portal - type: string - format: password - x-descriptors: - - urn:alm:descriptor:com.tectonic.ui:password - dataTypes: # <2> - out: - default: json - types: - json: - mediaType: application/json - template: # <3> - from: - uri: "twitter-search:{{keywords}}" - parameters: - accessToken: "{{accessToken}}" - accessTokenSecret: "{{accessTokenSecret}}" - consumerKey: "{{apiKey}}" - consumerSecret: "{{apiKeySecret}}" - steps: - - marshal: - json: {} - - to: "kamelet:sink" ----- -<1> An icon with an appropriate license, better using svg+base64 URL encoding. You can encode icons using services like https://dopiaza.org/tools/datauri/index.php[this one] -<2> The dataTypes section indicates that the Kamelet is going to produce JSON data as a default. The Kamelet is able to define multiple data types for in/out/error. The user will then be able to choose on of the data types in a Pipe when referencing the Kamelet. -<3> The previous YAML flow - -The Kamelet can be shared on the Catalog and or created on a Kubernetes cluster to let users use it. - -=== Trying it out - -A simple way to try it out is to apply it on a cluster, together with a simple binding. -Assuming that you have a Kubernetes cluster and you're connected to a namespace where the Camel K operator can act, just create the Kamelet: - -[source] ----- -kubectl apply -f twitter-search-source.kamelet.yaml ----- - -Then you can create a binding like the following one to try it out: - -.twitter-search-source-binding.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: twitter-search-source-binding -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: twitter-search-source - properties: - keywords: "Apache Camel" - apiKey: "your own" - apiKeySecret: "your own" - accessToken: "your own" - accessTokenSecret: "your own" - sink: - uri: "log:info" ----- - -This can be created using: - -[source] ----- -kubectl apply -f twitter-search-source-binding.yaml ----- - -Once created, you can see the logs of the binding using: - -[source] ----- -kamel logs twitter-search-source-binding ----- - -If everything goes right, you should get some tweets in the logs after the integration is created. - -Refer to the xref:kamelets/kamelets-user.adoc[Kamelets User Guide] for more information on how to use it in different contexts (like Knative, Kafka, etc.). - -== Kamelet versions - -The catalog containing a set of Kamelets is generally developed in order to be used with a given Camel version (see the Apache Camel Kamelets catalog). However, when publishing the Kamelet to the cluster you may want to maintain more than one version for any reason (ie, to use a different dependency and be able to support multiple runtimes). You can therefore use the `.spec.versions` parameter to optionally maintain a set of alternative versions beside the main (and default) one. - -.my-timer-source.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: my-timer-source -spec: - definition: - title: "Timer Example" - types: - out: - mediaType: text/plain - template: - from: - uri: timer:tick - steps: - - setBody: - constant: "Kamelet Main" - - to: "kamelet:sink" - versions: - v2: - definition: - title: "Timer Example 2" - types: - out: - mediaType: text/plain - template: - from: - uri: timer:tick - steps: - - setBody: - constant: "Kamelet V2" - - to: "kamelet:sink" ----- - -NOTE: make sure the overall content fits into 1 MiB, which is the storage limit for a Custom Resource. - -This is a way to handle multiple version on Kubernetes and may not be supported out of the box by Camel core. If the Integration will require specifically to use `kamelet:my-timer-source?kameletVersion=v2`, then, the operator will mount properly the specification on the running application. - -The `.spec.versions` field may not be necessarily supported by the core as it's meant to provide a way to handle versioning on the cluster only. The runtime must be provided with a materialized Kamelet file with the chosen spec (the operator is in charge of that). - -== Kamelet data types - -A Kamelet usually encapsulates a specific functionality and serves a very opinionated use case with well-defined input parameters and outcome. - -In order to enhance the Kamelet interoperability with other components the Kamelet may specify one to many data types for input, output and error scenarios. -The declaration of supported Kamelet data types helps users to incorporate the Kamelet into their specific applications. - -When referencing a Kamelet users may choose from a list of supported input/output data types in order to gain best fit for the individual use case. - -Following from that each Kamelet may declare all supported input/output data types each of them providing additional information like header names, content type, content schema and so on. - -.my-sample-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: my-sample-source - labels: - camel.apache.org/kamelet.type: "source" -spec: - definition: -# ... - dataTypes: - out: # <1> - default: application-json # <2> - headers: - MySpecialCamelHeaderName: # <3> - type: string - description: Some specific header - types: # <4> - application-json: - description: Output type as Json object - mediaType: application/json - schema: # <5> - type: object - description: The Json object representing the my-sample source output - properties: - # ... - dependencies: # <6> - - "camel:jackson" - text-plain: - description: Output type as plain text - mediaType: text/plain - template: - from: - uri: ... - steps: - - to: "kamelet:sink" ----- -<1> Declared output data types of this Kamelet source -<2> The output data type used by default -<3> Declaration of output headers with header name, type and description information -<4> List of supported output types -<5> Optional Json schema describing the `application-json` data type -<6> Optional list of additional dependencies that are required by the data type. - -The sample Kamelet above declares two supported output data types `application-json` and `text-plain`. -Each declared data type is backed by a specific Apache Camel transformer implementation that is capable of producing the specific output. -The respective transformer implementation may be provided by the Kamelet as a utility extension or by the underlying Apache Camel component. - -As a result the user may now choose the output data type when referencing the Kamelet in a binding. - -.my-sample-source-binding.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: my-sample-source-binding -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: my-sample-source - data-types: # <1> - out: - format: text-plain # <2> - sink: - uri: "log:info" ----- -<1> Chose the output data type on the Kamelet source reference in a Pipe. -<2> Select `text-plain` as an output data type of the `my-sample-source` Kamelet. - -The very same concept of data types can also be used on Kamelet sinks and input data types. -As soon as the user chooses a specific input data type for a Kamelet the Pipe processing will try to resolve a matching transformer implementation and apply its logic. - -NOTE: by default, the operator will use a `data-type-action` Kamelet that has to be an available Kamelet in the catalog. This is provided out of the box installing bundled Apache Kamelet catalog. It will fail if the Kamelet is not available. You can also override the Kamelet action to use adding the `camel.apache.org/kamelet.data.type` annotation to the Pipe specification. - -You may also use the `data-type-action` Kamelet in your Pipe binding in order to apply a specific data type transformation at any step. - -.my-sample-source-binding.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: my-sample-source-binding -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: my-sample-source - data-types: - out: - format: application-json # <1> - steps: - - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1alpha1 - name: json-deserialize-action # <2> - - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1alpha1 - name: resolve-pojo-schema-action # <3> - properties: - mimeType: "avro/binary" - schema: > - { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] } - - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1alpha1 - name: data-type-action # <4> - properties: - scheme: "camel" - format: "avro-binary" - sink: - uri: "log:info" ----- -<1> Chose the output data `application-json` type on the Kamelet source. -<2> Deserialize the Json object with `json-deserialize-action`. -<3> Declare a Avro schema -<4> Use the `data-type-action` Kamelet to transform the Json object into Avro using the formerly declared schema - -The Pipe in the sample above uses a combination of Kamelet output data type, Json deserialization and Avro binary data type to transform the Kamelet source output. - -All referenced data types are backed by a specific transformer implementation either provided by the Kamelet itself or by pure Apache Camel functionality. - -== Creating a complex Kamelet - -We're now going to create a Kamelet with a high degree of complexity, to show how the Kamelet model can be used also to go over the -functionality provided by a single Camel Component. - -TIP: This example is complicated on purpose and uses several components and EIPs from Apache Camel: luckily your Kamelets will be much simpler than this one. - -It will be a Kamelet of type "source", but most of the principles explained here can be taken into account also when developing a Kamelet -of type "sink". The technical differences between the two scenarios will be highlighted in the xref:creating-sink["Creating a sink Kamelet"] section. - -We're going to take a real world use case having a moderate complexity: we want to create a source of eartquake events around the world, taking data from the https://earthquake.usgs.gov/fdsnws/event/1/[USGS APIs]. - -=== Step 1: write an end-to-end integration - -Contrary to what one might expect, the first thing you need to do is to *forget about Kamelets* and just try to write a Camel K integration that consumes the earthquake data. - -You may choose the language that you prefer to write the first integration, even writing it directly in YAML. -We write it using the Java DSL because that is the language that most Apache Camel users are familiar with and it's also supported by the tooling. - -TIP: For a great developer experience, we suggest to use https://code.visualstudio.com/[Visual Studio Code] with the https://marketplace.visualstudio.com/items?itemName=redhat.apache-camel-extension-pack[Camel Extension Pack] - -We start from scratch by creating an integration file with Camel JBang CLI: - -[source] ----- -camel init Earthquake.java ----- - -This will scaffold a Java source file with a timer-to-log integration, that we'll edit according to our need. -A first version of the integration might look like the following: - -.Earthquake.java -[source,java] ----- -// camel-k: language=java - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.Exchange; - -public class Earthquake extends RouteBuilder { - @Override - public void configure() throws Exception { - - from("timer:earthquake?period=10000") // <1> - .setHeader(Exchange.HTTP_METHOD).constant("GET") - .to("https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson") // <2> - .convertBodyTo(String.class) - .to("log:info"); // <3> - - } -} ----- -<1> We do a timed poll from the API because there's no way to consume it direcly -<2> Look at https://earthquake.usgs.gov/fdsnws/event/1/ for more information about the API. We're using the https://en.wikipedia.org/wiki/GeoJSON[GeoJSON] format -<3> The integration ends in a "log:info" endpoint, because we just want to see if we can contact the API and get some results back - -In order to run the integration above, if you have a Kubernetes cluster with Camel K installed, you can rely on that using `kamel run Earthquake.java`, but there's a -simpler solution that just requires your own machine: - -[source] ----- -camel run Earthquake.java ----- - -The `camel run` command relies on Camel JBang to locally run the integration. The integration will start and begin printing out earthquake data every 10 seconds. - -I show an excerpt of what is printed by the integration: - -[source,json] ----- -{ - "type":"FeatureCollection", - "metadata":{ - "generated":1614860715000, - "url":"https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson", - "title":"USGS Earthquakes", - "status":200, - "api":"1.10.3", - "count":10762 - }, - "features":[ - { - "type":"Feature", - "properties":{ - "mag":2.17, - "place":"27km ENE of Pine Valley, CA", - "time":1614859396200, - "updated":1614860064420, - "url":"https://earthquake.usgs.gov/earthquakes/eventpage/ci39808832", - "detail":"https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ci39808832&format=geojson", - "status":"automatic", - "tsunami":0, - "sig":72, - "net":"ci", - "code":"39808832", - "ids":",ci39808832,", - "sources":",ci,", - "types":",focal-mechanism,nearby-cities,origin,phase-data,scitech-link,", - "nst":57, - "dmin":0.04475, - "rms":0.22, - "gap":60, - "magType":"ml", - "type":"earthquake", - "title":"M 2.2 - 27km ENE of Pine Valley, CA" - }, - "geometry":{ - "type":"Point", - "coordinates":[ - -116.2648333, - 32.9236667, - 3.54 - ] - }, - "id":"ci39808832" - } - ] -} ----- - -NOTE: We've truncated the list of "features" to just the first one, but it contains a lot more data - -=== Step 2 (optional): iterate on the integration - -Since the integration above produces useful data, its route could be technically used to build a source Kamelet, but there are a few problems we may want to address before publishing it: - -1. It produces a lot of data (10762 events, last 30 days by default). We may want to start emitting events of the last e.g. 2 hours by default for this use case: we can add a filter on the query to accomplish this. -2. It produces a collection of features (earthquake events), while you may want to push to the destination the individual features. We can use Camel's built-in `split` and `jsonpath` support to split the collection into separate entries. -3. It continuously produces the same data: i.e. just wait another 10 seconds and you'll get the same data again and again (with a shift of 10 seconds over the last 30 days). A good approach here is to try to filter out duplicates at the source -as much as possible. We can think to store the time when the last update has been generated by the server and use it in subsequent queries to only obtain new events. -This will not guarantee an "exactly once" semantics, because e.g. if the integration is restarted it will lose the in-memory state and start from the beginning, -but it prevents sending an high amount of redundant data if the integration is kept alive. -To store the time when last result has been generated by the API, we can use one of the in-memory caches that Camel provides, such as xref:components::caffeine-cache-component.adoc[camel-caffeine-cache]. - -WARNING: We're going to use an in-memory cache because we need to store a single value. When using stateful data repositories, such as caches, it's always a good practice to limit their size to a low value and avoid them to increase their size over time - -TIP: If an end-to-end "exactly once" semantics is needed, you could later add a stateful idempotent repository in the global integration, but these aspects should be external to the Kamelet definition - -Let's try sorting out these issues in the route (we publish here the final version): - -.Earthquake.java -[source,java] ----- -// camel-k: language=java - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.model.ClaimCheckOperation; -import org.apache.camel.Exchange; - -public class Earthquake extends RouteBuilder { - @Override - public void configure() throws Exception { - - from("timer:earthquake?period=10000") - .setHeader("CamelCaffeineAction").constant("GET") - .toD("caffeine-cache:cache-${routeId}?key=lastUpdate") // <1> - .choice() - .when().simple("${header.CamelCaffeineActionHasResult}") - .setProperty("lastUpdate", body()) - .otherwise() - .setProperty("lastUpdate", simple("${date-with-timezone:now-120m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}")) // <2> - .end() - .setHeader(Exchange.HTTP_METHOD).constant("GET") - .toD("https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc") // <3> - .unmarshal().json() - .setProperty("generated", simple("${body[metadata][generated]}")) // <4> - .setProperty("lastUpdate", simple("${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}")) - .claimCheck(ClaimCheckOperation.Push) // <5> - .setBody().exchangeProperty("lastUpdate") - .setHeader("CamelCaffeineAction").constant("PUT") - .toD("caffeine-cache:cache-${routeId}?key=lastUpdate") - .claimCheck(ClaimCheckOperation.Pop) - .split().jsonpath("$.features[*]") // <6> - .marshal().json() - .to("log:info") // <7> - .end(); - - } -} ----- -<1> We start each poll by checking if there has been a previous run (and get the corresponding time) -<2> If it's the first run of the integration, we set the clock back to 120m from the current time, to get events of the last 2 hours -<3> We always include the time from which we want to receive updates in the query to the service -<4> The service returns a "generated" field which includes a timestamp when the response has been generated: we'll use it in the following requests -<5> We put the current body in the claim check stack to use it for storing the "lastUpdate" field in the cache, then we restore the previous body -<6> Individual records of the response are sent to the destination (which is "log:info" in this phase). In case an exception is thrown while processing a single entry, individual errors are sent to the route error handler and the processing continues - -TIP: Don't be scared from the complexity of the route, as this is a complicated example by choice: most of the Kamelets in the xref:camel-kamelets::index.adoc[Kamelet Catalog] don't use any processing logic or EIP - -WARNING: When writing a route like this, you should always think to errors that might happen in various phases of the execution: here the "lastUpdate" value in the cache is updated after a -successful invocation of the API but before the individual exchanges are sent to the destination, so that the source is protected by individual errors on the features (that are sent to the route error handler), -but continues to process new data if a single feature can't be processed. - -This integration (which seems complex at first sight, but it should be still readable) solves the issues identified above by using multiple features available in Apache Camel (caches, "Simple" language, HTTP component, JSON data format, splitter EIP, claim check, JSONPath). -Even if it's not recommended to write overly-complicated integrations in a Kamelet (i.e. consider writing a plain component if it becomes too complicated and unreadable), you can see here how powerful is the Kamelet model. - -TIP: We might have written the integration above in multiple routes connected using "direct:" endpoints, but a Kamelet contains a single route template and the mapping will -be easier if the integration is composed of a single route (it's also possible to define multiple supporting routes in a Kamelet, but we're not going to show how to do it here) - -=== Step 3: externalize parameters - -The next step in the development is answering the following question: if I was a user instantiating this source, what aspects I would like to configure? - -For the example above, there are 2 things that a user may want to configure: - -- `period`: the time interval between polls to the earthquake API. This may seem a technical issue, but it becomes a business issue when contacting APIs that do rate limiting -- `lookAhead`: the number of minutes before the current time I would like to receive events since (it affects the events received when the source is first started or restarted) - -Those two will become Kamelet parameters as you might expect, but for the time being, let's refactor the integration to externalize them as standard Camel K properties: - -.Earthquake.java -[source,java] ----- -// camel-k: language=java property=period=20000 property=lookAhead=120 <1> - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.model.ClaimCheckOperation; -import org.apache.camel.Exchange; - -public class Earthquake extends RouteBuilder { - @Override - public void configure() throws Exception { - - from("timer:earthquake?period={{period}}") // <2> - // ... - .choice() - .when().simple("${header.CamelCaffeineActionHasResult}") - .setProperty("lastUpdate", body()) - .otherwise() - .setProperty("lastUpdate", simple("${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}")) // <3> - .end() - // ... - .end(); - - } -} ----- -<1> Modeline header defines the two parameters with a "development" value -<2> Placeholder `{\{period}}` is used -<3> Placeholder `{\{lookAhead}}` is used - -This looks the same as before, but notice that the `period` and `lookAhead` parameters are set in the modeline, while the route uses the `{\{period}}` -and `{\{lookAhead}}` placeholders instead of the actual values. - -As before, this integration can be tested with `camel run Earthquake.java` (the modeline parameters will be automatically added by the kamel CLI). - -=== Step 4 (optional): translate into YAML DSL - -The integration is now ready to be turned into a Kamelet, but in case you've not written it directly in YAML DSL, you need to convert it before proceeding. -The YAML DSL is the default DSL for Kamelets and the reason for that is that it provides multiple advantages over the other DSLs, -the most important one being the ability to easily compile YAML integrations into Quarkus-based binary executables in the future, -with all the advantages that derive from a point of view of performance and resource utilization. - -If we managed to reduce our integration to contain only a Camel route, converting it to YAML is straightforward: - -.earthquake.yaml -[source,yaml] ----- -# camel-k: language=yaml property=period=20000 property=lookAhead=120 dependency=camel-quarkus:caffeine dependency=camel-quarkus:http - -- from: - uri: "timer:earthquake" - parameters: - period: "{{period}}" - steps: - - setHeader: - name: CamelCaffeineAction - constant: GET - - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate" - - choice: - when: - - simple: "${header.CamelCaffeineActionHasResult}" - steps: - - set-property: - name: lastUpdate - simple: "${body}" - otherwise: - steps: - - set-property: - name: lastUpdate - simple: "${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}" - - setHeader: - name: CamelHttpMethod - constant: GET - - toD: "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc" - - unmarshal: - json: {} - - set-property: - name: generated - simple: "${body[metadata][generated]}" - - set-property: - name: lastUpdate - simple: "${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}" - - claim-check: - operation: Push - - setBody: - exchange-property: lastUpdate - - setHeader: - name: CamelCaffeineAction - constant: PUT - - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate" - - claim-check: - operation: Pop - - split: - jsonpath: "$.features[*]" - steps: - - marshal: - json: {} - - to: "log:info" ----- - -If you compare the YAML version of the route to the Java one, you see that they map 1-1. - -TIP: The Camel Extension Pack for Visual Studio Code helps you writing the YAML route by providing auto-completion and error highlighting - -WARNING: Since the YAML DSL is quite new in the Camel ecosystem, it may miss some features available in the Java one, e.g. Camel K is not able to detect -some dependencies automatically and we've specified them in the modeline header - -This route can be run like the previous one using the `kamel` CLI: - -[source] ----- -camel run earthquake.yaml ----- - -=== Step 5: wrap it into a Kamelet - -We're about to write down an "Earthquake Source Kamelet" from the route we've built. -As starting point, we may just wrap the previous YAML route into the Kamelet envelope. The result looks like: - -.earthquake-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: earthquake-source - labels: - camel.apache.org/kamelet.type: "source" -spec: - template: # <1> - from: - uri: "timer:earthquake" - parameters: - period: "{{period}}" - steps: - - setHeader: - name: CamelCaffeineAction - constant: GET - - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate" - - choice: - when: - - simple: "${header.CamelCaffeineActionHasResult}" - steps: - - set-property: - name: lastUpdate - simple: "${body}" - otherwise: - steps: - - set-property: - name: lastUpdate - simple: "${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}" - - setHeader: - name: CamelHttpMethod - constant: GET - - toD: "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc" - - unmarshal: - json: {} - - set-property: - name: generated - simple: "${body[metadata][generated]}" - - set-property: - name: lastUpdate - simple: "${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}" - - claim-check: - operation: Push - - setBody: - exchange-property: lastUpdate - - setHeader: - name: CamelCaffeineAction - constant: PUT - - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate" - - claim-check: - operation: Pop - - split: - jsonpath: "$.features[*]" - steps: - - marshal: - json: {} - - to: "kamelet:sink" # <2> ----- -<1> Flow contains the (single) route template we have identified before -<2> The old reference to "log:info" has been replaced with "kamelet:sink" here - -The only difference between the YAML route embedded in the Kamelet and the one identified before is the final sink, which was "log:info" and now is "kamelet:sink", i.e. -a placeholder that will be replaced with something else when the Kamelet is actually used (the user decides what is the destination of the earthquake events). - -=== Step 6: describe the parameters - -The Kamelet above is incomplete, we need to define the two parameters we've identified in the template and also give a description -to the Kamelet itself. The way to express all this information is via a https://json-schema.org/[JSON Schema] specification in the Kamelet YAML. - -.earthquake-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: earthquake-source - labels: - camel.apache.org/kamelet.type: "source" -spec: - definition: # <1> - title: Earthquake Source - description: |- - Get data about current earthquake events happening in the world using the USGS API - properties: - period: # <2> - title: Period between polls - description: The interval between fetches to the earthquake API in milliseconds - type: integer - default: 60000 - lookAhead: # <3> - title: Look-ahead minutes - description: The amount of minutes to look ahead when starting the integration afresh - type: integer - default: 120 - template: - from: - uri: "timer:earthquake" - # ... ----- -<1> The definition part starts with general information about the Kamelet -<2> Definition of the period parameter (used with the `{\{period}}` placeholder in the route) -<3> Definition of the lookAhead parameter - -TIP: In other scenarios, you might want to refer to non-required parameters in the Kamelet's `spec.template` using the `{{?optionalParam}}` syntax; that might be helpful for those cases where the non-required parameter does not define a default value in the Kamelet's `spec.definition.properties`. For more information, you can refer to the using Camel property placeholder syntax in the Camel Core project documentation. - -=== Step 7: add metadata and sugar - -We should complete the Kamelet with all mandatory (also optional) options that are described in https://github.com/apache/camel-kamelets[the guidelines for contributing Kamelets]. - -The final result should look like: - -.earthquake-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: earthquake-source - annotations: - camel.apache.org/kamelet.icon: "data:image/svg+xml;base64..." # truncated <1> - camel.apache.org/provider: "Apache Software Foundation" - labels: - camel.apache.org/kamelet.type: "source" - camel.apache.org/requires.runtime: "camel-quarkus" <2> -spec: - definition: - title: Earthquake Source - description: |- - Get data about current earthquake events happening in the world using the USGS API - properties: - period: - title: Period between polls - description: The interval between fetches to the earthquake API in milliseconds - type: integer - default: 60000 - lookAhead: - title: Look-ahead minutes - description: The amount of minutes to look ahead when starting the integration afresh - type: integer - default: 120 - dataTypes: # <3> - out: - default: json - types: - json: - mediaType: application/json - dependencies: # <4> - - camel-quarkus:caffeine - - camel-quarkus:http - template: - from: - uri: "timer:earthquake" - parameters: - period: "{{period}}" - steps: - - setHeader: - name: CamelCaffeineAction - constant: GET - - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate" - - choice: - when: - - simple: "${header.CamelCaffeineActionHasResult}" - steps: - - set-property: - name: lastUpdate - simple: "${body}" - otherwise: - steps: - - set-property: - name: lastUpdate - simple: "${date-with-timezone:now-{{lookAhead}}m:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}" - - setHeader: - name: CamelHttpMethod - constant: GET - - toD: "https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&updatedafter=${exchangeProperty.lastUpdate}&orderby=time-asc" - - unmarshal: - json: {} - - set-property: - name: generated - simple: "${body[metadata][generated]}" - - set-property: - name: lastUpdate - simple: "${date-with-timezone:exchangeProperty.generated:UTC:yyyy-MM-dd'T'HH:mm:ss.SSS}" - - claim-check: - operation: Push - - setBody: - exchange-property: lastUpdate - - setHeader: - name: CamelCaffeineAction - constant: PUT - - toD: "caffeine-cache:cache-${routeId}?key=lastUpdate" - - claim-check: - operation: Pop - - split: - jsonpath: "$.features[*]" - steps: - - marshal: - json: {} - - to: "kamelet:sink" ----- -<1> Add an icon with an appropriate license, better using svg+base64 URL encoding. You can encode icons using services like https://dopiaza.org/tools/datauri/index.php[this one] -<2> This marks the Kamelet as dependant on Quarkus since we're specifying explicit dependencies on Quarkus artifacts in the `spec` -> `dependencies` section -<3> The types section indicates that the Kamelet is going to produce JSON data by default. The Kamelet is able to define multiple data types for in/out/error. The user will then be able to choose on of the data types in a Pipe when referencing the Kamelet. -<4> Dependencies that we previously specified in the modeline options should be expressed now in the Kamelet spec - -The Kamelet is now ready to be used! - -=== Trying it out - -You can install the Kamelet on your Kubernetes instance to see if it can be picked up and used by the Camel K runtime. - -We assume that you're connected to a Kubernetes cluster and working on a namespace where the Camel K operator is allowed to materialize integrations. - -To create the Kamelet, you can execute: - -[source] ----- -kubectl apply -f earthquake-source.kamelet.yaml ----- - -If the Kamelet is valid, this will result in the Kamelet resource being created in the current namespace. - -To check if it works, you can create a simple binding: - -.earthquake-source-binding.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: earthquake-source-binding -spec: - source: - ref: # <1> - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: earthquake-source - properties: - period: 10000 # <2> - sink: - uri: "log:info" # <3> ----- -<1> Kubernetes reference to the previously created Kamelet -<2> We redefine the period to speed it up, otherwise the default is used (60000) -<3> We just sink into "log:info", but we're free to change it to anything else - -NOTE: The developer write Camel DSL to make a Kamelet work, but the end-user uses it declaratively without any idea -of the complexity of the development process behind it - -Creating this resource will tell the operator to materialize the binding using an integration: - -[source] ----- -kubectl apply -f earthquake-source-binding.yaml ----- - -We can check the logs of the integration using: - -[source] ----- -kamel logs earthquake-source-binding ----- - -If everything went well, you should see the events in the log. - -Refer to the xref:kamelets/kamelets-user.adoc[Kamelets User Guide] for more information on how to use it in different contexts (like Knative, Kafka, etc.). - -[[creating-sink]] -== Creating a sink Kamelet - -So far we've focused on the steps needed to create Kamelets of type "source", but the same steps can be used for type "sink" Kamelets with -some minor changes. - -We're now going to create a "sink" Kamelet and look at the differences. For this part, we'll write a https://core.telegram.org/[Telegram] sink Kamelet. - -=== Analyze the use cases - -Differently from sources, where you usually generate a single type of data, or even multiple ones depending on some static user parameter, a sink should always -take into account that it can be fed dynamically with different type of data. - -For example, in the case of a Telegram sink, a user may want to send both textual data, but also images with (or without) a caption. - -In order to implement sending different kinds of data, the Kamelet should adapt according to the content that is received as input. - -We'll start by writing an end-to-end integration, then we'll convert it into a Kamelet. This time, we'll write routes directly in YAML DSL. - -TIP: For this particular use case, I've created a simple integration before to get the Chat ID corresponding to my phone from the bot: more info xref:components::telegram-component.adoc[here]. - -Let's start with a simple integration: - -.telegram.yaml -[source,yaml] ----- -# camel-k: language=yaml property=chatId=158584902 <1> - -- from: # <2> - uri: "direct:endpoint" - steps: - - to: - uri: "telegram:bots" - parameters: - authorizationToken: "{{authorizationToken}}" - chatId: "{{chatId}}" - - marshal: # <3> - json: {} - -- from: # <4> - uri: timer:tick - parameters: - period: 5000 - steps: - - setBody: - constant: Hello - - to: direct:endpoint ----- -<1> Setting the `chatId` property directly in modeline, the `authorizationToken` will be passed from command line -<2> The route that will become the Kamelet route template -<3> We marhsal the output as JSON because it may be required to be transferred over the wire -<4> A testing route to check if the integration works - -The end-to-end integration above should be good as initial scaffolding for the integration. -We can run it using the following command: - -[source] ----- -kamel run telegram.yaml -p authorizationToken=the-token-you-got-from-bot-father ----- - -If everything went well, you should get a "Hello" message into your phone every 5 seconds. - -Now, let's check if we can also send an image, by changing the second route: - -[source,yaml] ----- -# first route as before -# ... - -- from: - uri: timer:tick - parameters: - period: 5000 - steps: - - setHeader: - name: CamelHttpMethod - constant: GET - - to: https://github.com/apache/camel/raw/7204aa132662ab6cb8e3c5afea8b9b0859eff0e8/docs/img/logo.png - - to: direct:endpoint ----- - -The intended behavior is that we get the image in our phone via Telegram, but it's **throwing an error instead**. -This is something that often happens because standard Camel components are not suited to be used out-of-the-box as connectors. - -In this case, the Telegram component requires that a `CamelTelegramMediaType` header is set to `PHOTO_PNG` in the exchange in order -to accept the image, and that the body is converted to `byte[]`. -But we cannot require that who sends the message to the Kamelet obey to all Camel rules. In general we should follow these guidelines: - -- We SHOULD NOT require that the sender sets Camel-specific bits in the message over the wire (e.g. a `CamelTelegramMediaType`): we should hide Camel under the covers as much as possible -- We CAN use the "Content-Type" header to distinguish the type of incoming data -- We CAN define new headers and allow the users to set them on the incoming message (e.g. when the incoming message is a picture, we can let the -sender specify a caption for it in the "text" header) -- When defining an header, it MUST be documented in the Kamelet definition -- When defining an header, say "text", we should also account for an additional header named "ce-text": in some contexts, like Knative, only headers allowed by the CloudEvents specification are accepted in the brokers/channels (i.e. a `ce-` prefix is mandatory) - -When applied to the current use case, the main route can be changed into something like this: - -[source,yaml] ----- -- from: - uri: "direct:endpoint" - steps: - - choice: # <1> - when: - - simple: "${header[Content-Type]} == 'image/png'" - steps: - - convert-body-to: - type: "byte[]" - - setHeader: - name: CamelTelegramMediaType - constant: PHOTO_PNG - - simple: "${header[Content-Type]} == 'image/jpeg'" - steps: - - convert-body-to: - type: "byte[]" - - setHeader: - name: CamelTelegramMediaType - constant: PHOTO_JPG - otherwise: - steps: - - convert-body-to: - type: "java.lang.String" - - choice: # <2> - when: - - simple: "${header[text]}" - steps: - - setHeader: - name: CamelTelegramMediaTitleCaption - simple: "${header[text]}" - - simple: "${header[ce-text]}" - steps: - - setHeader: - name: CamelTelegramMediaTitleCaption - simple: "${header[ce-text]}" - - choice: # <3> - when: - - simple: "${header[chat-id]}" - steps: - - setHeader: - name: CamelTelegramChatId - simple: "${header[chat-id]}" - - simple: "${header[ce-chat-id]}" - steps: - - setHeader: - name: CamelTelegramChatId - simple: "${header[ce-chat-id]}" - - to: - uri: "telegram:bots" - parameters: - authorizationToken: "{{authorizationToken}}" - chatId: "{{chatId}}" - - marshal: - json: {} ----- -<1> We do content-type based conversion into appropriate objects for the component -<2> We allow specifying a `text` or `ce-text` header to set the image caption -<3> We allow overriding the chat ID using a `chat-id` or `ce-chat-id` header - -WARNING: It's not always obvious if it's responsibility of the Kamelet to prepare the exchange to be fed into the Camel producer endpoint or -if the Camel component should be changed to be more elastic. In this case, it seems appropriate to implement things like content-type base conversion -and support for streaming content at component level. The Kamelet above is acceptable for the time being, but it needs to be simplified if such changes land into the component. - -Having defined the main route template, we need to document the Kamelet and the parameters. We show here the final Kamelet: - -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: telegram-sink - annotations: - camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,..." # truncated - camel.apache.org/provider: "Apache Software Foundation" - labels: - camel.apache.org/kamelet.type: "sink" - camel.apache.org/kamelet.group: "Telegram" -spec: - definition: # <1> - title: "Telegram Sink" - description: |- - Send a message to a Telegram chat using your Telegram bot as sender. - - To create a bot, contact the @botfather account using the Telegram app. - - This sink supports the following message types: - - - Standard text messages - - PNG images (`Content-Type` must be set to `image/png`) - - JPEG images (`Content-Type` must be set to `image/jpeg`) - - This following message headers are also supported: - - - `text` / `ce-text`: when sending an image, the image caption - - `chat-id` / `ce-chat-id`: to override the default chat where messages are sent to - required: - - authorizationToken - properties: - authorizationToken: - title: Token - description: The token to access your bot on Telegram. You you can obtain it from the Telegram @botfather. - type: string - x-descriptors: - - urn:alm:descriptor:com.tectonic.ui:password - chatId: - title: Chat ID - description: The Chat ID where messages should be sent by default - type: string - dataTypes: # <2> - out: - default: json - types: - json: - mediaType: application/json - template: # <3> - from: - uri: "kamelet:source" - steps: - - choice: - when: - - simple: "${header[Content-Type]} == 'image/png'" - steps: - - log: h1 - - convert-body-to: - type: "byte[]" - - setHeader: - name: CamelTelegramMediaType - constant: PHOTO_PNG - - simple: "${header[Content-Type]} == 'image/jpeg'" - steps: - - convert-body-to: - type: "byte[]" - - setHeader: - name: CamelTelegramMediaType - constant: PHOTO_JPG - otherwise: - steps: - - convert-body-to: - type: "java.lang.String" - - choice: - when: - - simple: "${header[text]}" - steps: - - setHeader: - name: CamelTelegramMediaTitleCaption - simple: "${header[text]}" - - simple: "${header[ce-text]}" - steps: - - setHeader: - name: CamelTelegramMediaTitleCaption - simple: "${header[ce-text]}" - - choice: - when: - - simple: "${header[chat-id]}" - steps: - - setHeader: - name: CamelTelegramChatId - simple: "${header[chat-id]}" - - simple: "${header[ce-chat-id]}" - steps: - - setHeader: - name: CamelTelegramChatId - simple: "${header[ce-chat-id]}" - - to: - uri: "telegram:bots" - parameters: - authorizationToken: "{{authorizationToken}}" - chatId: "{{chatId}}" - - marshal: - json: {} ----- -<1> JSON schema definition of the Kamelet configuration -<2> The Kamelet has a single possible output of type JSON -<3> The flow identified above as Kamelet route template - -=== Try it out - -To try a sink Kamelet, we should feed it with some data. The best way to do it is to do it directly with another Kamelet. - -So, for example, to send a text message to a chat, we may create a binding like the following: - -.telegram-text-binding.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: telegram-text-binding -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: timer-source - properties: - period: 10000 - message: Hello first Kamelet! - sink: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: telegram-sink - properties: - authorizationToken: "put-your-own" - chatId: "your-chat-id" ----- - -You can create the Kamelet with: - -[source] ----- -kubectl apply -f telegram-sink.kamelet.yaml ----- - -Then apply the binding with: - -[source] ----- -kubectl apply -f telegram-text-binding.yaml ----- - -If everything goes well, you should get a "Hello first Kamelet!" message in your phone every 10 seconds. - -To check if we can also receive pictures using the above Kamelet, we can create the following binding: - -.telegram-text-binding.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: telegram-image-binding -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: http-source - properties: - url: "https://github.com/apache/camel/raw/7204aa132662ab6cb8e3c5afea8b9b0859eff0e8/docs/img/logo.png" - contentType: "image/png" - period: 10000 - sink: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: telegram-sink - properties: - authorizationToken: "put-your-own" - chatId: "your-chat-id" ----- - -This will create a new integration that forwards the Apache Camel logo to your phone every 10 seconds. - -== Testing - -The most obvious way to test a Kamelet is via an e2e tests that verifies if the Kamelet respects its specification. - -https://github.com/citrusframework/yaks[YAKS] is the framework of choice for such e2e tests. You can find more information and -documentation starting from the https://github.com/citrusframework/yaks[YAKS GitHub repository]. Here we'll provide examples for the Kamelets above. - -=== Testing a source - -YAKS allows writing a declarative https://cucumber.io/docs/gherkin/reference/[Gherkin] file to specify the behavior of the Kamelet. - -Let's try to test the earthquake Kamelet above, a Gherkin file for it should look like: - -.earthquake-source.feature -[source,gherkin] ----- -Feature: Kamelet earthquake-source works - - Background: - Given Disable auto removal of Kamelet resources - Given Disable auto removal of Kubernetes resources - Given Camel K resource polling configuration - | maxAttempts | 60 | - | delayBetweenAttempts | 3000 | - - Scenario: Bind Kamelet to service - Given create Kubernetes service test-service with target port 8080 - And bind Kamelet earthquake-source to uri http://test-service.${YAKS_NAMESPACE}.svc.cluster.local/test - When create Pipe earthquake-source-uri - Then Pipe earthquake-source-uri should be available - And Camel K integration earthquake-source-uri should be running - - Scenario: Verify binding - Given HTTP server "test-service" - And HTTP server timeout is 120000 ms - Then expect HTTP request header: Content-Type="application/json;charset=UTF-8" - And receive POST /test - And delete Pipe earthquake-source-uri ----- - -As you see this is a declarative test that is materialized into something that actually checks that the service generates some data. -Checks can be also more detailed than this one, but checking that it generates some JSON data is enough for a "smoke test" that verifies that the Kamelet -can be actually used. - -The test requires that you're connected to a Kubernetes cluster and have also YAKS installed (refer to the https://citrusframework.org/yaks/reference/html/index.html[YAKS documentation] for more information). -We're also going to use the CLI: - -[source] ----- -# We assume the Kamelet is already installed in the namespace -yaks run earthquake-source.feature ----- - -When testing a source, the backbone of the Gherking file that you'll write is similar to the one above. -Depending on the source under test, you may need to stimulate the production of some data using additional Gherking steps -before verifying that the data has been produced -(in our case, it's better not to try to stimulate an earthquake :D). - -=== Testing a sink - -A test for a sink is similar to the one for the source, except that we're going to generate data to feed it. - -To send data to the Kamelet we may think to bind it to another Kamelet of type `webhook-source`, that allows us to -send data to it via HTTP. Let's create a parameterized binding like the following one: - -.webhook-to-telegram.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: webhook-to-telegram -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: webhook-source - properties: - subpath: message - sink: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: telegram-sink - properties: - authorizationToken: "${telegram.authorization.token}" - chatId: "${telegram.chat.id}" ----- - -This will expose an HTTP endpoint that we can use to forward a message to Telegram. It requires that two parameters are set -in the YAKS configuration before creation. Those can be set in a simple property file: - -.telegram-credentials.properties -[source,properties] ----- -telegram.authorization.token=your-own-token -telegram.chat.id=your-own-chat ----- - -Then we're ready to define the feature we want to test, i.e. the ability to send a message via the Telegram API. - -An example of "smoke test" can be the following one: - -.telegram-sink.feature -[source,gherkin] ----- -Feature: Kamelet telegram-sink works - - Background: - Given Disable auto removal of Kamelet resources - Given Disable auto removal of Kubernetes resources - Given Camel K resource polling configuration - | maxAttempts | 60 | - | delayBetweenAttempts | 3000 | - - - Scenario: Bind webhook to Kamelet sink - Given load variables telegram-credentials.properties - And load Pipe webhook-to-telegram.yaml - Then Pipe webhook-to-telegram should be available - And Camel K integration webhook-to-telegram should be running - - - Scenario: Send a message to the Telegram Chat - Given URL: http://webhook-to-telegram.${YAKS_NAMESPACE}.svc.cluster.local - And HTTP request timeout is 60000 milliseconds - And wait for GET on path / to return 404 - Given HTTP request headers - | Content-Type | text/plain | - And HTTP request body - """ - Hello from YAKS! - """ - When send POST /message - Then receive HTTP 200 OK - And delete Pipe webhook-to-telegram - ----- - -This test will only check that the Telegram API accept the message created by the test. - -This can be run with the following command: - -[source] ----- -# We assume that both the webhook-source and the telegram-sink kamelet are already present in the namespace -yaks run telegram-sink.feature --resource webhook-to-telegram.yaml --resource telegram-credentials.properties ----- - -If everything goes well, you should receive a message during the test execution. - -For a more specific test that checks also the content sent to Telegram, you should add additional Gherking steps -to get and verify the actual message via other Telegram APIs. We're not going in so much details for this example, -but the Gherkin file highlighted above is a good approximation of the backbone you'll find in tests for Kamelets of type "sink". - -== KEDA Integration - -Kamelets of type `source` can be augmented with https://keda.sh/[KEDA] metadata to automatically configure autoscalers. - -The additional KEDA metadata is needed for the following purposes: - -- Map Kamelet properties to corresponding KEDA parameters -- Distinguish which KEDA parameters are needed for authentication (and need to be placed in a `Secret`) -- Mark KEDA parameters as required to signal an error during reconciliation - -[[kamelet-keda-dev]] -=== Basic properties to KEDA parameter mapping - -Any Kamelet property can be mapped to a KEDA parameter by simply declaring the mapping in the `x-descriptors` list. -For example: - -.aws-sqs-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: aws-sqs-source - labels: - camel.apache.org/kamelet.type: "source" -spec: - definition: - # ... - properties: - queueNameOrArn: - title: Queue Name - description: The SQS Queue Name or ARN - type: string - x-descriptors: - - urn:keda:metadata:queueURL # <1> - - urn:keda:required # <2> -# ... ----- -<1> The Kamelet property `queueNameOrArn` corresponds to a KEDA metadata parameter named `queueURL` -<2> The `queueURL` parameter is required by KEDA - -In the example above, the `queueNameOrArn` Kamelet property is declared to correspond to a KEDA *metadata* parameter named `queueURL`, using the `urn:keda:metadata:` prefix. -The `queueURL` parameter is documented in the https://keda.sh/docs/2.5/scalers/aws-sqs/[the KEDA AWS SQS Queue scaler] together with other options -required by KEDA to configure an autoscaler (it can be a full queue URL or a simple queue name). -By using the marker descriptor `urn:keda:required`, it is also marked as required by KEDA. - -The `queueURL` is a *metadata* parameter for the autoscaler. In order to configure *authentication* parameters, the syntax is slightly different: - -.aws-sqs-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: aws-sqs-source - labels: - camel.apache.org/kamelet.type: "source" -spec: - definition: - # ... - properties: - # ... - accessKey: - title: Access Key - description: The access key obtained from AWS - type: string - format: password - x-descriptors: - - urn:alm:descriptor:com.tectonic.ui:password - - urn:camel:group:credentials - - urn:keda:authentication:awsAccessKeyID <1> - - urn:keda:required -# ... ----- -<1> The Kamelet property `access` corresponds to a KEDA authentication parameter named `awsAccessKeyID` - -This time the property mapping uses the `urn:keda:authentication:` prefix, declaring it as a KEDA authentication parameter. -The difference between the two approaches is that authentication parameters will be injected into a secret by the Camel K -operator and linked to the KEDA ScaledObject using a TriggerAuthentication (refer to the https://keda.sh/[KEDA documentation] for more info). - -=== Advanced KEDA property mapping - -There are cases where KEDA requires some static values to be set in a ScaledObject or also values computed from multiple Kamelet properties. -To deal with these cases it's possible to use annotations on the Kamelet prefixed with `camel.apache.org/keda.metadata.` (for metadata parameters) -or `camel.apache.org/keda.authentication.` (for authentication parameters). Those annotations can contain plain fixed values or also *templates* (using the Go syntax). - -For example: - -.my-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: my-source - labels: - camel.apache.org/kamelet.type: "source" - annotations: - camel.apache.org/keda.authentication.sasl: "plaintext" # <1> - camel.apache.org/keda.metadata.queueLength: "5" # <2> - camel.apache.org/keda.metadata.queueAddress: "https://myhost.com/queues/{{.queueName}}" # <3> -spec: - definition: - # ... - properties: - queueName: - title: Queue Name - description: The Queue Name - type: string -# ... ----- -<1> An authentication parameter with a fixed value -<2> A metadata parameter with a fixed value -<3> A metadata parameter with a valued computed from a template - -When using the template syntax, all Kamelet properties are available as fields. The default values are used in case they are missing from the user configuration. - -For information on how to use Kamelets with KEDA, see the xref:kamelets/kamelets-user.adoc#kamelet-keda-user[KEDA section in the user guide]. diff --git a/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc b/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc index d39814ced..3cd130c8b 100644 --- a/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc +++ b/docs/modules/ROOT/pages/kamelets/kamelets-user.adoc @@ -1,7 +1,6 @@ -= Kamelets user guide += How to configure a Kamelet -Speaking technically, a Kamelet is a resource that can be installed on any Kubernetes cluster. -The following is an example of a Kamelet that we'll use to discuss the various parts: +Speaking technically, a Kamelet is a resource that can be installed on any Kubernetes cluster or used as a plain yaml configuration in Apache Camel runtimes. The following is an example of a Kamelet that we'll use to discuss the various parts: .telegram-text-source.kamelet.yaml [source,yaml] @@ -68,23 +67,19 @@ At a high level (more details are provided later), a Kamelet resource describes: - An optional section containing information about input and output expected by the Kamelet (`types`) - A Camel flow in YAML DSL containing the implementation of the Kamelet (`flow`) -Once **installed on a Kubernetes namespace**, the Kamelet can be **used by any integration in that namespace**. - -Kamelets can be installed on a Kubernetes namespace with a simple command: +Once **installed on a Kubernetes namespace**, the Kamelet can be **used by any Integration in that namespace**. Kamelets can be installed on a Kubernetes namespace with a simple command: [source,shell] ---- kubectl apply -f telegram-text-source.kamelet.yaml ---- -Kamelets are standard YAML files, but their common extension is `.kamelet.yaml` to help IDEs to recognize them and provide auto-completion (in the future). +Kamelets are standard YAML files, but their common extension is `.kamelet.yaml` to help IDEs to recognize them and possibly provide auto-completion. [[kamelets-usage-integration]] == Using Kamelets in Integrations -Kamelets can be used in integrations **as if they were standard Camel components**. For example, -suppose that you've created the `telegram-text-source` Kamelet in the `default` namespace on Kubernetes, -then you can write the following integration to use the Kamelet: +Kamelets can be used in integrations **as if they were standard Camel components**. For example, suppose that you've created the `telegram-text-source` Kamelet in the `default` namespace on Kubernetes, then you can write the following integration to use the Kamelet: [source,yaml] .kamlet-route.yaml @@ -97,9 +92,7 @@ then you can write the following integration to use the Kamelet: NOTE: URI properties ("botToken") match the corresponding parameters in the Kamelet definition -Kamelets can also be used multiple times in the same route definition. This happens usually with sink Kamelets. - -Suppose that you've defined a Kamelet named "my-company-log-sink" in your Kubernetes namespace, then you can write a route like this: +Kamelets can also be used multiple times in the same route definition. This happens usually with sink Kamelets. Suppose that you've defined a Kamelet named "my-company-log-sink" in your Kubernetes namespace, then you can write a route like this: [source,yaml] .kamlet-multi-route.yaml @@ -120,7 +113,7 @@ The "my-company-log-sink" will obviously define what it means to write a log in When using a Kamelet, the instance parameters (e.g. "botToken", "bucket") can be passed explicitly in the URI or you can use properties. Properties can be also loaded implicitly by the operator from Kubernetes secrets (see below). -==== 1. URI based configuration +==== URI based configuration You can configure the Kamelet by passing directly the configuration parameters in the URI, as in: @@ -133,7 +126,7 @@ You can configure the Kamelet by passing directly the configuration parameters i In this case, "the-token-value" is passed explicitly in the URI (you can also pass a custom property placeholder as value). -==== 2. Property based configuration +==== Property based configuration An alternative way to configure the Kamelet is to provide configuration parameters as properties of the integration. @@ -177,47 +170,6 @@ Then the integration can be run with the following command: kamel run kamelet-properties-route.yaml --property file:kamelet-example.properties ---- -==== 3. Implicit configuration using secrets - -Property based configuration can also be used implicitly by creating secrets in the namespace that will be used to -determine the Kamelets configuration. - -To use implicit configuration via secret, we first need to create a configuration file holding only the properties of a named configuration. - -[source,properties] -.mynamedconfig.properties ----- -# Only configuration related to the "mynamedconfig" named config -camel.kamelet.my-company-log-sink.mynamedconfig.bucket=special -# camel.kamelet.my-company-log-sink.mynamedconfig.xxx=yyy ----- - -We can create a secret from the file and label it so that it will be picked up automatically by the operator: - -[source,shell] ----- -# Create the secret from the property file -kubectl create secret generic my-company-log-sink.mynamedconfig --from-file=mynamedconfig.properties -# Bind it to the named configuration "mynamedconfig" of the "my-company-log-sink" Kamelet -kubectl label secret my-company-log-sink.mynamedconfig camel.apache.org/kamelet=my-company-log-sink camel.apache.org/kamelet.configuration=mynamedconfig ----- - -You can now write an integration that uses the Kamelet with the named configuration: - -[source,yaml] -.kamlet-namedconfig-route.yaml ----- -- from: - uri: "timer:tick" - steps: - - setBody: - constant: "Hello" - - to: "kamelet:my-company-log-sink/mynamedconfig" ----- - -You can run this integration without specifying other parameters, the Kamelet endpoint will be implicitly configured by the Camel K operator that will -automatically mount the secret into the integration Pod. - === Kamelet versioning Kamelets provided in a catalog are generally meant to work with a given runtime version (the same for which they are released). However, when you create a Kamelet and publish to a cluster, you may want to store and use different versions. If the Kamelet is provided with more than the `main` version, then, you can specify which version to use in your Integration by adding the version parameter. For instance: @@ -233,662 +185,7 @@ Kamelets provided in a catalog are generally meant to work with a given runtime The operator will be able to automatically pick the right version and use it at runtime. If no version is specified, then you will use the default one. -[[kamelets-usage-binding]] -== Binding Kamelets in Pipes - -In some contexts (for example **"serverless"**) users often want to leverage the power of Apache Camel to be able to connect to various sources/sinks, without -doing additional processing (such as transformations or other enterprise integration patterns). - -A common use case is that of **Knative Event Sources**, for which the Apache Camel developers provide the concept of Kamelets and Pipes. -Kamelets represent an **evolution** of the Camel route templates to provide an opinionated and easy to use connector to various components and services. -The Kamelets allow using a declarative style of binding sources and sinks where data produced by a source, transformed in the form of actions steps and pushed to a given sink, via a resource named **Pipe**. - -=== Binding to Knative - -A Pipe allows to move data from a system described by a Kamelet towards a Knative destination, or from a Knative channel/broker to another external system described by a Kamelet. -This means Pipes may act as event sources and sinks for the Knative eventing broker in a declarative way. - -For example, here is a Pipe that connects a Kamelet Telegram source to the Knative broker: - -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: telegram-to-knative -spec: - source: # <1> - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: telegram-text-source - properties: - botToken: the-token-here - sink: # <2> - ref: - kind: Broker - apiVersion: eventing.knative.dev/v1 - name: default ----- -<1> Reference to the source that provides data -<2> Reference to the sink where data should be sent to - -This binding takes the `telegram-text-source` Kamelet, configures it using specific properties ("botToken") and -makes sure that messages produced by the Kamelet are forwarded to the Knative **Broker** named "default". - -Note that source and sink are specified as standard **Kubernetes object references** in a declarative way. - -Knative eventing uses the CloudEvents data format by default. -You may want to set some properties that specify the event attributes such as the event type. - -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: telegram-to-knative -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: telegram-text-source - properties: - botToken: the-token-here - sink: - ref: - kind: Broker - apiVersion: eventing.knative.dev/v1 - name: default - properties: - type: org.apache.camel.telegram.events # <1> ----- -<1> Sets the event type attribute of the CloudEvent produced by this Pipe - -This way you may specify event attributes before publishing to the Knative broker. -Note that Camel uses a default CloudEvents event type `org.apache.camel.event` for events produced by Camel. - -You can overwrite CloudEvent event attributes on the sink using the `ce.overwrite.` prefix when setting a property. - -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: telegram-to-knative -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: telegram-text-source - properties: - botToken: the-token-here - sink: - ref: - kind: Broker - apiVersion: eventing.knative.dev/v1 - name: default - properties: - type: org.apache.camel.telegram.events - ce.overwrite.ce-source: my-source # <1> ----- -<1> Use "ce.overwrite.ce-source" to explicitly set the CloudEvents source attribute. - -The example shows how we can reference the "telegram-text-source" resource in a Pipe. -It's contained in the `source` section because it's a Kamelet of type "source". -A Kamelet of type "sink", by contrast, can only be used in the `sink` section of a `Pipe`. - -**Under the covers, a Pipe creates an Integration** resource that implements the binding, but all details of how to connect with -Telegram forwarding the data to the Knative broker is fully transparent to the end user. For instance the Integration uses a `SinkBinding` concept -under the covers in order to retrieve the Knative broker endpoint URL. - -In the same way you can also connect a Kamelet source to a Knative channel. - -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: telegram-to-knative-channel -spec: - source: # <1> - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: telegram-text-source - properties: - botToken: the-token-here - sink: # <2> - ref: - kind: InMemoryChannel - apiVersion: messaging.knative.dev/v1 - name: messages ----- -<1> Reference to the source that provides data -<2> Reference to the Knative channel that acts as the sink where data should be sent to - -When reading data from Knative you just need to specify for instance the Knative broker as a source in the Pipe. -Events consumed from Knative event stream will be pushed to the given sink of the Pipe. - -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: knative-to-slack -spec: - source: # <1> - ref: - kind: Broker - apiVersion: eventing.knative.dev/v1 - name: default - properties: - type: org.apache.camel.event.messages - sink: # <2> - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: slack-sink - properties: - channel: "#my-channel" - webhookUrl: the-webhook-url ----- -<1> Reference to the Knative broker source that provides data -<2> Reference to the sink where data should be sent to - -Once again, the Pipe provides a declarative way of creating event sources and sinks for Knative eventing. -In the example, all events of type `org.apache.camel.event.messages` get forwarded to the given Slack channel using the Webhook API. - -When consuming events from the Knative broker you most likely need to filter and select the events to process. -You can do that with the properties set on the Knative broker source reference, for instance filtering by the even type as shown in the example. -The filter possibilities include CloudEvent attributes such as event type, source, subject and extensions. - -In the background Camel K will automatically create a Knative Trigger resource for the Pipe that uses the filter attributes accordingly. - -.Sample trigger created by Camel K -[source,yaml] ----- -apiVersion: eventing.knative.dev/v1 -kind: Trigger -metadata: - name: camel-event-messages -spec: - broker: default # <1> - filter: - attributes: - type: org.apache.camel.event.messages - myextension: my-extension-value - subscriber: - ref: - apiVersion: serving.knative.dev/v1 # <2> - kind: Service - name: camel-service - uri: /events/camel.event.messages ----- -<1> Reference to the Knative broker source that provides data -<2> Reference to the Camel K integration/pipe service - -The trigger calls the Camel K integration service endpoint URL and pushes events with the given filter attributes to the Pipe. -All properties that you have set on the Knative broker source reference will be set as a filter attribute on the trigger resource (except for reserved properties such as `name` and `cloudEventsType`). - -Note that Camel K creates the trigger resource only for Knative broker type event sources. -In case you reference a Knative channel as a source in a Pipe Camel K assumes that the channel and the trigger are already present. -Camel K will only create the subscription for the integration service on the channel. - -=== Binding to a Kafka Topic - -The example seen in the previous paragraph can be also configured to push data a https://strimzi.io/[Strimzi] Kafka topic (Kamelets can be also configured to pull data from topics). - -To do so, you need to: - -- Install Strimzi on your cluster -- Create a Strimzi *Kafka* cluster using plain listener and **no authentication** -- Create a Strimzi *KafkaTopic* named `my-topic` - -Refer to the https://strimzi.io/[Strimzi documentation] for instructions on how to do that. - -The following binding can be created to push data into the `my-topic` topic: - -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: telegram-text-source-to-kafka -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: telegram-text-source - properties: - botToken: the-token-here - sink: - ref: # <1> - kind: KafkaTopic - apiVersion: kafka.strimzi.io/v1beta1 - name: my-topic ----- -<1> Kubernetes reference to a Strimzi KafkaTopic - -After creating it, messages will flow from Telegram to Kafka. - -=== Binding to an explicit URI - -An alternative way to use a Pipe is to configure the source/sink to be an explicit Camel URI. -For example, the following binding is allowed: - -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: telegram-text-source-to-channel -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: telegram-text-source - properties: - botToken: the-token-here - sink: - uri: https://mycompany.com/the-service # <1> ----- -<1> Pipe with explicitly URI - -This Pipe explicitly defines an URI where data is going to be pushed. - -NOTE: the `uri` option is also conventionally used in Knative to specify a non-kubernetes destination. -To comply with the Knative specifications, in case an "http" or "https" URI is used, Camel will send https://cloudevents.io/[CloudEvents] to the destination. - -=== Binding with data types - -When referencing Kamelets in a binding users may choose from one of the supported input/output data types provided by the Kamelet. -The supported data types are declared on the Kamelet itself and give additional information about used header names, content type and content schema. - -.my-sample-source-to-log.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: my-sample-source-to-log -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: my-sample-source - data-types: # <1> - out: - format: text-plain # <2> - sink: - uri: "log:info" ----- -<1> Specify the output data type on the referenced Kamelet source. -<2> Select `text-plain` as an output data type of the `my-sample-source` Kamelet. - -The very same Kamelet `my-sample-source` may also provide a CloudEvents specific data type as an output which fits perfect for binding to a Knative broker. - -.my-sample-source-to-knative.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: my-sample-source-to-knative -spec: - source: - ref: - kind: Kamelet - apiVersion: camel.apache.org/v1 - name: my-sample-source - data-types: - out: - format: application-cloud-events # <1> - sink: - ref: - kind: Broker - apiVersion: eventing.knative.dev/v1 - name: default ----- -<1> Select `application-cloud-events` as an output data type of the `my-sample-source` Kamelet. - -Information about the supported data types can be found on the Kamelet itself. - -.my-sample-source.kamelet.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Kamelet -metadata: - name: my-sample-source - labels: - camel.apache.org/kamelet.type: "source" -spec: - definition: -# ... - dataTypes: - out: # <1> - default: text-plain # <2> - types: # <3> - text-plain: - description: Output type as plain text. - mediaType: text/plain - application-cloud-events: - description: CloudEvents specific representation of the Kamelet output. - mediaType: application/cloudevents+json - schema: # <4> - # ... - dependencies: # <5> - - "camel:cloudevents" - - template: - from: - uri: ... - steps: - - to: "kamelet:sink" ----- -<1> Declared output data types of this Kamelet source -<2> The output data type used by default -<3> List of supported output types -<4> Optional Json schema describing the `application/cloudevents+json` data type -<5> Optional list of additional dependencies that are required by the data type. - -This way users may choose the best Kamelet data type for a specific use case when referencing Kamelets in a binding. - -=== Error Handling - -You can configure an error handler in order to specify what to do when some event ends up with failure. See xref:kamelets/kameletbindings-error-handler.adoc[Pipes Error Handler User Guide] for more detail. - -=== Trait via annotations - -You can easily tune your `Pipe` with xref:traits:traits.adoc[traits] configuration adding `.metadata.annotations`. Let's have a look at the following example: - -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: timer-2-log-annotation - annotations: # <1> - trait.camel.apache.org/logging.level: DEBUG - trait.camel.apache.org/logging.color: "false" -spec: - source: - uri: timer:foo - sink: - uri: log:bar ----- -<1> Include `.metadata.annotations` to specify the list of traits we want to configure - -In this example, we've set the `logging` trait to specify certain configuration we want to apply. You can do the same with all the traits available, just by setting `trait.camel.apache.org/trait-name.trait-property` with the expected value. - -NOTE: if you need to specify an array of values, the syntax will be `trait.camel.apache.org/trait.conf: "[\"opt1\", \"opt2\", ...]"` - [[kamelets-troubleshooting]] == Troubleshooting -A `Kamelet` is translated into a `Route` used from the `Ìntegration`. In order to troubleshoot any possible issue, you can have a look at the dedicated xref:troubleshooting/debugging.adoc#debugging-kamelets[troubleshoot section]. - -[[kamelets-specification]] -== Kamelet Specification - -We're now going to describe the various parts of the Kamelet in more details. - -[[kamelets-specification-metadata]] -=== Metadata - -The metadata section contains important information related to the Kamelet as Kubernetes resource. - -.Metadata Fields -|=== -|name |Description |Type |Example - -|`name` -|ID of the Kamelet, used to refer to the Kamelet in external routes -|`string` -|E.g. `telegram-text-source` - -|`namespace` -|The Kubernetes namespace where the resource is installed -|`string` -| -|=== - -The following annotations and labels are also defined on the resource: - -.Annotations -|=== -|name |Description |Type |Example - -|`camel.apache.org/kamelet.icon` -|An optional icon for the Kamelet in URI data format -|`string` -|E.g. `data:image/svg+xml;base64,PD94bW...` - -|`trait.camel.apache.org/trait-name.trait-property` -|An optional configuration setting for a trait -|`string` -|E.g. `trait.camel.apache.org/logging.level: DEBUG` -|=== - -.Labels -|=== -|name |Description |Type |Example - -|label: `camel.apache.org/kamelet.type` -|Indicates if the Kamelet can be used as source, action or sink. -|enum: `source`, `action`, `sink` -|E.g. `source` -|=== - -[[kamelets-specification-definition]] -=== Definition - -The definition part of a Kamelet contains a valid JSON-schema document describing general information about the -Kamelet and all defined parameters. - - -.Definition Fields -|=== -|name |Description |Type |Example - -|`title` -|Display name of the Kamelet -|`string` -|E.g. `Telegram Text Source` - -|`description` -|A markdown description of the Kamelet -|`string` -|E.g. `Receive all text messages that people send to your telegram bot...` - -|`required` -|List of required parameters (complies with JSON-schema spec) -|array: `string` -| - -|`properties` -|Map of properties that can be configured on the Kamelet -|map: `string` -> `schema` -| - -|=== - -Each property defined in the Kamelet has its own schema (normally a flat schema, containing only 1 level of properties). -The following table lists some common fields allowed for each property. - -.Definition Parameters -|=== -|name |Description |Type |Example - -|`title` -|Display name of the property -|`string` -|E.g. `Token` - -|`description` -|Simple text description of the property -|`string` -|E.g. `The token to access your bot on Telegram` - -|`type` -|JSON-schema type of the property -|`string` -|E.g. `string` - -|`x-descriptors` -|Specific aids for the visual tools -|array: `string` -|E.g. `- urn:alm:descriptor:com.tectonic.ui:password` displays the property as a password field in a tectonic-type form -|=== - -=== Data shapes - -Kamelets are designed to be plugged as sources or sinks in more general routes, so they can accept data as input and/or -produce their own data. To help visual tools and applications to understand how to interact with the Kamelet, the -specification of a Kamelet includes also information about type of data that it manages. - -[source,yaml] ----- -# ... -spec: - # ... - dataTypes: - out: # <1> - default: json - types: - json: # <2> - mediaType: application/json - schema: # <3> - properties: - # ... ----- -<1> Defines the type of the `output` -<2> Name of the data type -<3> Optional JSON-schema definition of the output - -Data shape can be indicated for the following channels: - -- `in`: the input of the Kamelet, in case the Kamelet is of type `sink` -- `out`: the output of the Kamelet, for both `source` and `sink` Kamelets -- `error`: an optional error data shape, for both `source` and `sink` Kamelets - -Data shapes contain the following information: - -.Data Shape Options -|=== -|name |Description |Type |Example - -|`scheme` -|A specific component scheme that is used to identify the data shape -|`string` -|E.g. `aws2-s3` - -|`format` -|The data shape name used to identify and reference the data type in a Pipe when choosing from multiple data type options. -|`string` -|E.g. `json` - -|`mediaType` -|The media type of the data -|`string` -|E.g. `application/json` - -|`headers` -|Optional map of message headers that get set with the data shape where the map keys represent the header name and the value defines the header type information. -|`map` -| - -|`dependencies` -|Optional list of additional dependencies that are required for this data type (e.g. Json marshal/unmarshal libraries) -|`list` -|E.g. `mvn:org.jackson:jackson-databind` - -|`schema` -|An optional JSON-schema definition for the data -|`object` -| -|=== - -=== Flow - -Each Kamelet contains a YAML-based Camel DSL that provides the actual implementation of the connector. - -For example: - - -[source,yaml] ----- -spec: - # ... - template: - from: - uri: telegram:bots - parameters: - authorizationToken: "#property:botToken" - steps: - - convert-body-to: - type: "java.lang.String" - type-class: "java.lang.String" - charset: "UTF8" - - filter: - simple: "${body} != null" - - log: "${body}" - - to: "kamelet:sink" ----- - -Source and sink flows will connect to the outside route via the `kamelet:source` or `kamelet:sink` special endpoints: -- A source Kamelet must contain a call **to** `kamelet:sink` -- A sink Kamelet must start **from** `kamelet:source` - -NOTE: The `kamelet:source` and `kamelet:sink` endpoints are special endpoints that are only available in Kamelet route templates and will be replaced with actual references at runtime. - -Kamelets contain a **single route template** written in YAML DSL, as in the previous example. - -Kamelets, however, can also contain additional sources in the `spec` -> `sources` field. Those sources can be of any kind -(not necessarily route templates) and will be added once to all the integrations where the Kamelet is used. -They main role is to do advanced configuration of the integration context where the Kamelet is used, such as registering -beans in the registry or adding customizers. - -[[kamelet-keda-user]] -== KEDA enabled Kamelets - -Some Kamelets are enhanced with KEDA metadata to allow users to automatically configure autoscalers on them. -Kamelets with KEDA features can be distinguished by the presence of the annotation `camel.apache.org/keda.type`, -which is set to the name of a specific KEDA autoscaler. - -A KEDA enabled Kamelet can be used in the same way as any other Kamelet, in a binding or in an integration. -KEDA autoscalers are not enabled by default: they need to be manually enabled by the user via the `keda` trait. - -In a Pipe, the KEDA trait can be enabled using annotations: - -.my-keda-binding.yaml -[source,yaml] ----- -apiVersion: camel.apache.org/v1 -kind: Pipe -metadata: - name: my-keda-binding - annotations: - trait.camel.apache.org/keda.enabled: "true" -spec: - source: - # ... - sink: - # ... ----- - -In an integration, it can be enabled using `kamel run` args, for example: - -[source,shell] ----- -kamel run my-keda-integration.yaml -t keda.enabled=true ----- - -NOTE: Make sure that the `my-keda-integration` uses at least one KEDA enabled Kamelet, otherwise enabling KEDA (without other options) will have no effect. - -For information on how to create KEDA enabled Kamelets, see the xref:kamelets/kamelets-dev.adoc#kamelet-keda-dev[KEDA section in the development guide]. +A `Kamelet` is translated into a `Route` used from the `Integration`. In order to troubleshoot any possible issue, you can have a look at the dedicated xref:troubleshooting/debugging.adoc#debugging-kamelets[troubleshoot section]. diff --git a/docs/modules/ROOT/pages/kamelets/kamelets.adoc b/docs/modules/ROOT/pages/kamelets/kamelets.adoc index a9f97ce3a..2bea27233 100644 --- a/docs/modules/ROOT/pages/kamelets/kamelets.adoc +++ b/docs/modules/ROOT/pages/kamelets/kamelets.adoc @@ -18,7 +18,7 @@ The **Kamelet's interface** will define what parameters should be provided (e.g. **Internally**, the Kamelet defines how such events will be generated: it may involve connections to multiple systems using different protocols, transformations and so on. But everything will be **hidden to the end user**. -Kamelets are the fundamental **unit of abstraction** in the next-gen architecture of Apache Camel K. +Kamelets are a new **unit of abstraction** in the next-gen architecture of Apache Camel. A system as a whole can be technically described as the set of operations that you can do with it: if you use the language of Kamelets to describe a specific system, then other users can have access to all those operations with ease, no matter how complicated is the internal logic underlying all those operations. @@ -29,4 +29,4 @@ Kamelets are also expected to be **rendered on visual tools** that will provide They are generic connectors that can be used in multiples ways, depending on the context, so each UIs can use them for its own purpose. -Have a look at the xref:kamelets/kamelets-user.adoc[Kamelets User Guide] to learn how to immediately use Kamelets in Camel K or check xref:kamelets/kamelets-distribution.adoc[Distribution][how to use your own Kamelet catalog]. +Have a look at the xref:kamelets/kamelets-user.adoc[Kamelets User Guide] to learn how to immediately use Kamelets in Camel K or check xref:kamelets/kamelets-distribution.adoc[how to use your own Kamelet catalog]. diff --git a/docs/modules/ROOT/pages/kamelets/keda.adoc b/docs/modules/ROOT/pages/kamelets/keda.adoc new file mode 100644 index 000000000..23420aa65 --- /dev/null +++ b/docs/modules/ROOT/pages/kamelets/keda.adoc @@ -0,0 +1,109 @@ += KEDA (Kubernetes Event-Driven Autoscaling) + +Kamelets of type `source` can be augmented with https://keda.sh/[KEDA] metadata to automatically configure **autoscalers**. The additional KEDA metadata is needed for the following purposes: + +- Map Kamelet properties to corresponding KEDA parameters +- Distinguish which KEDA parameters are needed for authentication (and need to be placed in a `Secret`) +- Mark KEDA parameters as required to signal an error during reconciliation + +WARNING: this feature is in an experimental phase. + +[[kamelet-keda-dev]] +== Basic properties to KEDA parameter mapping + +Any Kamelet property can be mapped to a KEDA parameter by simply declaring the mapping in the `x-descriptors` list. +For example: + +.aws-sqs-source.kamelet.yaml +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: aws-sqs-source + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + # ... + properties: + queueNameOrArn: + title: Queue Name + description: The SQS Queue Name or ARN + type: string + x-descriptors: + - urn:keda:metadata:queueURL # <1> + - urn:keda:required # <2> +# ... +---- +<1> The Kamelet property `queueNameOrArn` corresponds to a KEDA metadata parameter named `queueURL` +<2> The `queueURL` parameter is required by KEDA + +In the example above, the `queueNameOrArn` Kamelet property is declared to correspond to a KEDA *metadata* parameter named `queueURL`, using the `urn:keda:metadata:` prefix. The `queueURL` parameter is documented in the https://keda.sh/docs/2.5/scalers/aws-sqs/[the KEDA AWS SQS Queue scaler] together with other options required by KEDA to configure an autoscaler (it can be a full queue URL or a simple queue name). + +By using the marker descriptor `urn:keda:required`, it is also marked as required by KEDA. + +The `queueURL` is a *metadata* parameter for the autoscaler. In order to configure *authentication* parameters, the syntax is slightly different: + +.aws-sqs-source.kamelet.yaml +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: aws-sqs-source + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: + # ... + properties: + # ... + accessKey: + title: Access Key + description: The access key obtained from AWS + type: string + format: password + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:password + - urn:camel:group:credentials + - urn:keda:authentication:awsAccessKeyID <1> + - urn:keda:required +# ... +---- +<1> The Kamelet property `access` corresponds to a KEDA authentication parameter named `awsAccessKeyID` + +This time the property mapping uses the `urn:keda:authentication:` prefix, declaring it as a KEDA authentication parameter. The difference between the two approaches is that authentication parameters will be injected into a secret by the Camel K operator and linked to the KEDA ScaledObject using a TriggerAuthentication (refer to the https://keda.sh/[KEDA documentation] for more info). + +== Advanced KEDA property mapping + +There are cases where KEDA requires some static values to be set in a ScaledObject or also values computed from multiple Kamelet properties. To deal with these cases it's possible to use annotations on the Kamelet prefixed with `camel.apache.org/keda.metadata.` (for metadata parameters) or `camel.apache.org/keda.authentication.` (for authentication parameters). Those annotations can contain plain fixed values or also *templates* (using the Go syntax). For example: + +.my-source.kamelet.yaml +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: my-source + labels: + camel.apache.org/kamelet.type: "source" + annotations: + camel.apache.org/keda.authentication.sasl: "plaintext" # <1> + camel.apache.org/keda.metadata.queueLength: "5" # <2> + camel.apache.org/keda.metadata.queueAddress: "https://myhost.com/queues/{{.queueName}}" # <3> +spec: + definition: + # ... + properties: + queueName: + title: Queue Name + description: The Queue Name + type: string +# ... +---- +<1> An authentication parameter with a fixed value +<2> A metadata parameter with a fixed value +<3> A metadata parameter with a valued computed from a template + +When using the template syntax, all Kamelet properties are available as fields. The default values are used in case they are missing from the user configuration. For information on how to use Kamelets with KEDA see xref:pipes/pipes.adoc#kamelet-keda-user[how to run Pipes with KEDA]. diff --git a/docs/modules/ROOT/pages/kamelets/kameletbindings-error-handler.adoc b/docs/modules/ROOT/pages/pipes/error-handler.adoc similarity index 91% rename from docs/modules/ROOT/pages/kamelets/kameletbindings-error-handler.adoc rename to docs/modules/ROOT/pages/pipes/error-handler.adoc index 747533fa0..9d78cf1bf 100644 --- a/docs/modules/ROOT/pages/kamelets/kameletbindings-error-handler.adoc +++ b/docs/modules/ROOT/pages/pipes/error-handler.adoc @@ -1,17 +1,13 @@ -[[bindings-error-handler]] = Pipes Error Handler -[[bindings-error-handler-introduction]] -== Introduction - -Pipes offer a mechanism to specify an error policy to adopt in case an event produced by a `source` or consumed by a `sink`. Through the definition of an `errorHandler` you will be able to apply certain logic to the failing event, such as simply logging, ignoring the event or posting the event to a `Sink`. +Pipes offer a mechanism to specify an error policy to adopt in case an event produced by a `source` or consumed by a `sink`. Through the definition of an `errorHandler` you will be able to apply certain logic to the failing event, such as simply logging, ignoring the event or posting the event to another `Sink`. [source,yaml] ---- apiVersion: camel.apache.org/v1 kind: Pipe metadata: - name: my-kamelet-binding + name: my-binding spec: source: # <1> ... @@ -38,7 +34,7 @@ There may be certain cases where you want to just ignore any failure happening o apiVersion: camel.apache.org/v1 kind: Pipe metadata: - name: my-kamelet-binding + name: my-binding spec: source: ... @@ -59,7 +55,7 @@ Apache Camel offers a default behavior for handling any failure: log to standard apiVersion: camel.apache.org/v1 kind: Pipe metadata: - name: my-kamelet-binding + name: my-binding spec: source: ... @@ -83,7 +79,7 @@ The `Sink` is probably the most interesting error handler type as it allows you apiVersion: camel.apache.org/v1 kind: Pipe metadata: - name: my-kamelet-binding + name: my-binding spec: source: ... @@ -106,4 +102,3 @@ spec: <1> You can use `ref` or `uri`. `ref` will be interpreted by the operator according the `kind`, `apiVersion` and `name`. You can use any `Kamelet`, `KafkaTopic` channel or `Knative` destination. <2> Properties belonging to the endpoint (in this example, to the `Kamelet` named error handler) <3> Parameters belonging to the `sink` error handler type - diff --git a/docs/modules/ROOT/pages/pipes/pipes.adoc b/docs/modules/ROOT/pages/pipes/pipes.adoc index 1aeb14cc0..715fa113b 100644 --- a/docs/modules/ROOT/pages/pipes/pipes.adoc +++ b/docs/modules/ROOT/pages/pipes/pipes.adoc @@ -54,7 +54,36 @@ spec: - uri: https://gist.githubusercontent.com/squakez/48b4ebf24c2579caf6bcb3e8a59fa509/raw/c7d9db6ee5e8851f5dc6a564172d85f00d87219c/gistfile1.txt ``` -In the example above we're making sure to call an intermediate resource in order to fill the content with some value. +In the example above we're making sure to call an intermediate resource in order to fill the content with some value. This **action** is configured in the `.spec.steps` parameter. + +=== Traits configuration + +Although this should not be necessarily required (the operator do all the required configuration for you), you can tune your `Pipe` with xref:traits:traits.adoc[traits] configuration adding `.metadata.annotations`. Let's have a look at the following example: + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: timer-2-log-annotation + annotations: # <1> + trait.camel.apache.org/logging.level: DEBUG + trait.camel.apache.org/logging.color: "false" +spec: + source: + uri: timer:foo + sink: + uri: log:bar +---- +<1> Include `.metadata.annotations` to specify the list of traits we want to configure + +In this example, we've set the `logging` trait to specify certain configuration we want to apply. You can do the same with all the traits available, just by setting `trait.camel.apache.org/trait-name.trait-property` with the expected value. + +NOTE: if you need to specify an array of values, the syntax will be `trait.camel.apache.org/trait.conf: "[\"opt1\", \"opt2\", ...]"` + +== Using Kamel CLI + +Camel K works very well with any Kubernetes compatible user interface (such as CLI as `kubectl`, `oc` or any other visual tooling). However we do provide a simple CLI that helps you performing most of the Pipe works in an easier fashion: it's xref:pipes/bind-cli.adoc[`kamel` CLI]. == Differences with Integrations @@ -64,7 +93,7 @@ Most of the time you will have consumer applications (one Pipe) which are consum NOTE: Camel K operator will allow you to use directly Kafka (Strimzi) and Knative endpoints custom resources. -== Examples +== More advanced examples Here some other examples involving Kamelets, Knative and Kafka. @@ -99,9 +128,9 @@ spec: username: my-usr ``` -=== Consuming events from a Kafka topic +=== Binding to Kafka topics -Another typical use case is consume/produce events directly from a KafkaTopic custom resource (managed by Strimzi operator) or Knative resources: +Another typical use case is consume/produce events directly from a KafkaTopic custom resource (managed by https://strimzi.io/[Strimzi] operator): ```yaml apiVersion: camel.apache.org/v1 @@ -123,6 +152,340 @@ spec: name: beer-events ``` -== Using Kamel CLI +NOTE: the Strimzi operator is required to be installed and a KafkaTopic configured. -Camel K works very well with any Kubernetes compatible user interface (such as CLI as `kubectl`, `oc` or any other visual tooling). However we do provide a simple CLI that helps you performing most of the Pipe works in an easier fashion: it's xref:pipes/bind-cli.adoc[`kamel` CLI]. +=== Binding to Knative resources + +A Pipe allows to move data from a system described by a Kamelet towards a https://knative.dev[Knative] destination, or from a Knative channel/broker to another external system described by a Kamelet. This means Pipes may act as event sources and sinks for the Knative eventing broker in a declarative way. + +NOTE: all examples require Knative operator installed and the related resources configured as well. + +For example, here is a Pipe that connects a Kamelet Telegram source to the Knative broker: + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: telegram-to-knative +spec: + source: # <1> + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: telegram-text-source + properties: + botToken: the-token-here + sink: # <2> + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default +---- +<1> Reference to the source that provides data +<2> Reference to the sink where data should be sent to + +This binding takes the `telegram-text-source` Kamelet, configures it using specific properties ("botToken") and makes sure that messages produced by the Kamelet are forwarded to the Knative **Broker** named "default". Note that source and sink are specified as standard **Kubernetes object references** in a declarative way. Knative eventing uses the CloudEvents data format by default. You may want to set some properties that specify the event attributes such as the event type. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: telegram-to-knative +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: telegram-text-source + properties: + botToken: the-token-here + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.telegram.events # <1> +---- +<1> Sets the event type attribute of the CloudEvent produced by this Pipe + +This way you may specify event attributes before publishing to the Knative broker. Note that Camel uses a default CloudEvents event type `org.apache.camel.event` for events produced by Camel. You can overwrite CloudEvent event attributes on the sink using the `ce.overwrite.` prefix when setting a property. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: telegram-to-knative +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: telegram-text-source + properties: + botToken: the-token-here + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.telegram.events + ce.overwrite.ce-source: my-source # <1> +---- +<1> Use "ce.overwrite.ce-source" to explicitly set the CloudEvents source attribute. + +The example shows how we can reference the "telegram-text-source" resource in a Pipe. It's contained in the `source` section because it's a Kamelet of type "source". A Kamelet of type "sink", by contrast, can only be used in the `sink` section of a `Pipe`. + +Under the covers, a Pipe creates an Integration resource that implements the binding, but all details of how to connect with Telegram forwarding the data to the Knative broker is fully transparent to the end user. For instance the Integration uses a `SinkBinding` concept under the covers in order to retrieve the Knative broker endpoint URL. + +In the same way you can also connect a Kamelet source to a Knative channel. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: telegram-to-knative-channel +spec: + source: # <1> + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: telegram-text-source + properties: + botToken: the-token-here + sink: # <2> + ref: + kind: InMemoryChannel + apiVersion: messaging.knative.dev/v1 + name: messages +---- +<1> Reference to the source that provides data +<2> Reference to the Knative channel that acts as the sink where data should be sent to + +When reading data from Knative you just need to specify for instance the Knative broker as a source in the Pipe. Events consumed from Knative event stream will be pushed to the given sink of the Pipe. + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: knative-to-slack +spec: + source: # <1> + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default + properties: + type: org.apache.camel.event.messages + sink: # <2> + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: slack-sink + properties: + channel: "#my-channel" + webhookUrl: the-webhook-url +---- +<1> Reference to the Knative broker source that provides data +<2> Reference to the sink where data should be sent to + +Once again, the Pipe provides a declarative way of creating event sources and sinks for Knative eventing. In the example, all events of type `org.apache.camel.event.messages` get forwarded to the given Slack channel using the Webhook API. + +When consuming events from the Knative broker you most likely need to filter and select the events to process. You can do that with the properties set on the Knative broker source reference, for instance filtering by the even type as shown in the example. The filter possibilities include CloudEvent attributes such as event type, source, subject and extensions. + +In the background Camel K will automatically create a Knative Trigger resource for the Pipe that uses the filter attributes accordingly. + +.Sample trigger created by Camel K +[source,yaml] +---- +apiVersion: eventing.knative.dev/v1 +kind: Trigger +metadata: + name: camel-event-messages +spec: + broker: default # <1> + filter: + attributes: + type: org.apache.camel.event.messages + myextension: my-extension-value + subscriber: + ref: + apiVersion: serving.knative.dev/v1 # <2> + kind: Service + name: camel-service + uri: /events/camel.event.messages +---- +<1> Reference to the Knative broker source that provides data +<2> Reference to the Camel K integration/pipe service + +The trigger calls the Camel K integration service endpoint URL and pushes events with the given filter attributes to the Pipe. All properties that you have set on the Knative broker source reference will be set as a filter attribute on the trigger resource (except for reserved properties such as `name` and `cloudEventsType`). + +Note that Camel K creates the trigger resource only for Knative broker type event sources. In case you reference a Knative channel as a source in a Pipe Camel K assumes that the channel and the trigger are already present. Camel K will only create the subscription for the integration service on the channel. + +=== Binding to an explicit URI + +An alternative way to use a Pipe is to configure the source/sink to be an explicit Camel URI. For example, the following binding is allowed: + +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: telegram-text-source-to-channel +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: telegram-text-source + properties: + botToken: the-token-here + sink: + uri: https://mycompany.com/the-service # <1> +---- +<1> Pipe with explicitly URI + +This Pipe explicitly defines an URI where data is going to be pushed. + +NOTE: the `uri` option is also conventionally used in Knative to specify a non-kubernetes destination. To comply with the Knative specifications, in case an "http" or "https" URI is used, Camel will send https://cloudevents.io/[CloudEvents] to the destination. + +== Binding with data types + +When referencing Kamelets in a binding users may choose from one of the supported input/output data types provided by the Kamelet. The supported data types are declared on the Kamelet itself and give additional information about used header names, content type and content schema. + +.my-sample-source-to-log.yaml +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: my-sample-source-to-log +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: my-sample-source + data-types: # <1> + out: + format: text-plain # <2> + sink: + uri: "log:info" +---- +<1> Specify the output data type on the referenced Kamelet source. +<2> Select `text-plain` as an output data type of the `my-sample-source` Kamelet. + +The very same Kamelet `my-sample-source` may also provide a CloudEvents specific data type as an output which fits perfect for binding to a Knative broker. + +.my-sample-source-to-knative.yaml +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: my-sample-source-to-knative +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1 + name: my-sample-source + data-types: + out: + format: application-cloud-events # <1> + sink: + ref: + kind: Broker + apiVersion: eventing.knative.dev/v1 + name: default +---- +<1> Select `application-cloud-events` as an output data type of the `my-sample-source` Kamelet. + +Information about the supported data types can be found on the Kamelet itself. + +.my-sample-source.kamelet.yaml +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Kamelet +metadata: + name: my-sample-source + labels: + camel.apache.org/kamelet.type: "source" +spec: + definition: +# ... + dataTypes: + out: # <1> + default: text-plain # <2> + types: # <3> + text-plain: + description: Output type as plain text. + mediaType: text/plain + application-cloud-events: + description: CloudEvents specific representation of the Kamelet output. + mediaType: application/cloudevents+json + schema: # <4> + # ... + dependencies: # <5> + - "camel:cloudevents" + + template: + from: + uri: ... + steps: + - to: "kamelet:sink" +---- +<1> Declared output data types of this Kamelet source +<2> The output data type used by default +<3> List of supported output types +<4> Optional Json schema describing the `application/cloudevents+json` data type +<5> Optional list of additional dependencies that are required by the data type. + +This way users may choose the best Kamelet data type for a specific use case when referencing Kamelets in a binding. + +[[kamelet-keda-user]] +== KEDA enabled Pipes + +Some Kamelets are enhanced with https://keda.sh/[KEDA] metadata to allow users to automatically configure autoscalers on them. Kamelets with KEDA features can be distinguished by the presence of the annotation `camel.apache.org/keda.type`, which is set to the name of a specific KEDA autoscaler. + +WARNING: this feature is in an experimental phase. + +A KEDA enabled Kamelet can be used in the same way as any other Kamelet, in a Pipe or in an Integration. KEDA autoscalers are not enabled by default: they need to be manually enabled by the user via the `keda` trait. + +NOTE: KEDA operator is required to run on the cluster. + +In a Pipe, the KEDA trait can be enabled using annotations: + +.my-keda-binding.yaml +[source,yaml] +---- +apiVersion: camel.apache.org/v1 +kind: Pipe +metadata: + name: my-keda-binding + annotations: + trait.camel.apache.org/keda.enabled: "true" +spec: + source: + # ... + sink: + # ... +---- + +In an integration, it can be enabled using `kamel run` args, for example: + +[source,shell] +---- +kamel run my-keda-integration.yaml -t keda.enabled=true +---- + +NOTE: Make sure that the `my-keda-integration` uses at least one KEDA enabled Kamelet, otherwise enabling KEDA (without other options) will have no effect. + +For information on how to create KEDA enabled Kamelets, see the xref:kamelets/kamelets-dev.adoc#kamelet-keda-dev[KEDA section in the development guide]. \ No newline at end of file