lburgazzoli closed pull request #247: Allow to push to Knative channels URL: https://github.com/apache/camel-k/pull/247
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/gke-setup.adoc b/docs/gke-setup.adoc index 5aa35aa2..5bc39ebf 100644 --- a/docs/gke-setup.adoc +++ b/docs/gke-setup.adoc @@ -1,24 +1,24 @@ [[gke-cluster]] Configuring a Google Kubernetes Engine (GKE) Cluster -============================== +==================================================== -This guide assumes you've already create a Kubernetes Engine cluster on https://console.cloud.google.com. +This guide assumes you've already created a Kubernetes Engine cluster on https://console.cloud.google.com. Make sure you've selected a version of Kubernetes greater than **1.11** when creating the cluster. You can create it in any region. -In the list of clusters for the current project, GKE provides a connection script that you need to execute on a shell to configure the `kubectl` command. +In the list of clusters for the current project, GKE provides a connection string that you need to execute on a shell to configure the `kubectl` command. -NOTE: the script contains a `--project` flag that indicates your **project ID**. You should keep that information for the last step. +NOTE: the connection string contains a `--project` flag that indicates your **project ID**. You should keep that information for the last step. -After executing the connection script, if everything is installed correctly, you should be able to execute: +After executing the connection string, if everything is installed correctly, you should be able to execute: ``` kubectl get pod ``` -When the cluster is first installed, you should find that no pods are present in the cluster. You can proceed with the installation then. +When the cluster is first installed, you should find that "no pods are present" in the cluster. You can proceed with the installation then. -Before installing Camel K on a fresh GKE cluster, you need to perform a extra step to give to your account the required cluster-admin permissions. +Before installing Camel K on a fresh GKE cluster, you need to perform some extra steps to give to your account the required cluster-admin permissions. This means executing the following command (**replacing "your-addr...@gmail.com" with your account email address**): ``` @@ -36,25 +36,27 @@ The best way to obtain a valid key is from the web console: - To avoid confusion, it's suggested to use the "English" language in preferences of the Google Cloud console - Select "IAM & admin" from the navigation menu, then "Service accounts" - Create a new service account specifying the following id: **"camel-k-builder"** -- You'll be asked to select a role. It's important to **select the **"Storage Admin" role** from the "Storage" menu +- You'll be asked to select a role. It's important to select the **"Storage Admin" role** from the "Storage" menu - Finish creating the service account - From the action menu of the service account you've created, **create a key** using the JSON format A `.json` file with the key will be downloaded to your machine. You need to store that key in a Kubernetes secret. It's **important** to rename the file you've just downloaded to `kaniko-secret.json` (make sure you write it correctly). -After the renaming, execute the following command: +After the renaming, execute the following command to create the secret: ``` kubectl create secret generic kaniko-secret --from-file=kaniko-secret.json ``` -You're ready to install Camel K. You should execute the following command to install it correctly: +You're ready to install Camel K. You should now execute the following command to install cluster resources and the operator (in the current namespace): ``` kamel install --registry gcr.io --organization <<your-project-id>> --push-secret kaniko-secret ``` -Use the project id that you've annotated when issuing the first connection string. Note: the project id is **NOT** the cluster id! +Use the project id that you've annotated when executing the first connection string. + +NOTE: the project id is **NOT** the cluster id! You're now ready to play with Camel K! diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go index e4637460..b2cd9585 100644 --- a/pkg/trait/knative.go +++ b/pkg/trait/knative.go @@ -20,6 +20,8 @@ package trait import ( "encoding/json" "fmt" + "github.com/operator-framework/operator-sdk/pkg/sdk" + "github.com/pkg/errors" "strings" "github.com/apache/camel-k/pkg/apis/camel/v1alpha1" @@ -28,7 +30,6 @@ import ( knativeutil "github.com/apache/camel-k/pkg/util/knative" eventing "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" serving "github.com/knative/serving/pkg/apis/serving/v1alpha1" - "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -36,6 +37,7 @@ import ( type knativeTrait struct { BaseTrait `property:",squash"` Sources string `property:"sources"` + Sinks string `property:"sinks"` } func newKnativeTrait() *knativeTrait { @@ -50,9 +52,13 @@ func (t *knativeTrait) appliesTo(e *Environment) bool { func (t *knativeTrait) autoconfigure(e *Environment) error { if t.Sources == "" { - channels := getSourceChannels(e) + channels := t.getSourceChannels(e) t.Sources = strings.Join(channels, ",") } + if t.Sinks == "" { + channels := t.getSinkChannels(e) + t.Sinks = strings.Join(channels, ",") + } return nil } @@ -60,11 +66,15 @@ func (t *knativeTrait) apply(e *Environment) error { for _, sub := range t.getSubscriptionsFor(e) { e.Resources.Add(sub) } - e.Resources.Add(t.getServiceFor(e)) + svc, err := t.getServiceFor(e) + if err != nil { + return err + } + e.Resources.Add(svc) return nil } -func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service { +func (t *knativeTrait) getServiceFor(e *Environment) (*serving.Service, error) { // combine properties of integration with context, integration // properties have the priority properties := CombineConfigurationAsMap("property", e.Context, e.Integration) @@ -102,7 +112,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service { environment["AB_JOLOKIA_OFF"] = "true" // Knative integration - environment["CAMEL_KNATIVE_CONFIGURATION"] = t.getConfigurationSerialized(e) + conf, err := t.getConfigurationSerialized(e) + if err != nil { + return nil, err + } + environment["CAMEL_KNATIVE_CONFIGURATION"] = conf labels := map[string]string{ "camel.apache.org/integration": e.Integration.Name, @@ -135,11 +149,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service { }, } - return &svc + return &svc, nil } func (t *knativeTrait) getSubscriptionsFor(e *Environment) []*eventing.Subscription { - channels := getConfiguredSourceChannels(t.Sources) + channels := t.getConfiguredSourceChannels() subs := make([]*eventing.Subscription, 0) for _, ch := range channels { subs = append(subs, t.getSubscriptionFor(e, ch)) @@ -174,19 +188,19 @@ func (*knativeTrait) getSubscriptionFor(e *Environment, channel string) *eventin } } -func (t *knativeTrait) getConfigurationSerialized(e *Environment) string { - env := t.getConfiguration(e) +func (t *knativeTrait) getConfigurationSerialized(e *Environment) (string, error) { + env, err := t.getConfiguration(e) res, err := json.Marshal(env) if err != nil { - logrus.Warning("Unable to serialize Knative configuration", err) - return "" + return "", errors.Wrap(err, "unable to serialize Knative configuration") } - return string(res) + return string(res), nil } -func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnvironment { - sourceChannels := getConfiguredSourceChannels(t.Sources) +func (t *knativeTrait) getConfiguration(e *Environment) (knativeutil.CamelEnvironment, error) { env := knativeutil.NewCamelEnvironment() + // Sources + sourceChannels := t.getConfiguredSourceChannels() for _, ch := range sourceChannels { svc := knativeutil.CamelServiceDefinition{ Name: ch, @@ -200,6 +214,29 @@ func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnviron } env.Services = append(env.Services, svc) } + // Sinks + sinkChannels := t.getConfiguredSinkChannels() + for _, ch := range sinkChannels { + channel, err := t.retrieveChannel(e.Integration.Namespace, ch) + if err != nil { + return env, err + } + hostname := channel.Status.Address.Hostname + if hostname == "" { + return env, errors.New("cannot find address of channel " + ch) + } + svc := knativeutil.CamelServiceDefinition{ + Name: ch, + Host: hostname, + Port: 80, + Protocol: knativeutil.CamelProtocolHTTP, + ServiceType: knativeutil.CamelServiceTypeChannel, + Metadata: map[string]string{ + knativeutil.CamelMetaServicePath: "/", + }, + } + env.Services = append(env.Services, svc) + } // Adding default endpoint defSvc := knativeutil.CamelServiceDefinition{ Name: "default", @@ -212,12 +249,12 @@ func (t *knativeTrait) getConfiguration(e *Environment) knativeutil.CamelEnviron }, } env.Services = append(env.Services, defSvc) - return env + return env, nil } -func getConfiguredSourceChannels(sources string) []string { +func (t *knativeTrait) getConfiguredSourceChannels() []string { channels := make([]string, 0) - for _, ch := range strings.Split(sources, ",") { + for _, ch := range strings.Split(t.Sources, ",") { cht := strings.Trim(ch, " \t\"") if cht != "" { channels = append(channels, cht) @@ -226,7 +263,7 @@ func getConfiguredSourceChannels(sources string) []string { return channels } -func getSourceChannels(e *Environment) []string { +func (*knativeTrait) getSourceChannels(e *Environment) []string { channels := make([]string, 0) metadata.Each(e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool { @@ -236,3 +273,42 @@ func getSourceChannels(e *Environment) []string { return channels } + +func (t *knativeTrait) getConfiguredSinkChannels() []string { + channels := make([]string, 0) + for _, ch := range strings.Split(t.Sinks, ",") { + cht := strings.Trim(ch, " \t\"") + if cht != "" { + channels = append(channels, cht) + } + } + return channels +} + +func (*knativeTrait) getSinkChannels(e *Environment) []string { + channels := make([]string, 0) + + metadata.Each(e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool { + channels = append(channels, knativeutil.ExtractChannelNames(meta.ToURIs)...) + return true + }) + + return channels +} + +func (*knativeTrait) retrieveChannel(namespace string, name string) (*eventing.Channel, error) { + channel := eventing.Channel{ + TypeMeta: metav1.TypeMeta{ + Kind: "Channel", + APIVersion: eventing.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + } + if err := sdk.Get(&channel); err != nil { + return nil, errors.Wrap(err, "could not retrieve channel " + name + " in namespace " + namespace) + } + return &channel, nil +} diff --git a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java index eb3a1fdf..ce50bebf 100644 --- a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java +++ b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java @@ -52,7 +52,6 @@ scheme = "knative", syntax = "knative:type/target", title = "Knative", - producerOnly = true, label = "cloud,eventing") public class KnativeEndpoint extends DefaultEndpoint implements DelegateEndpoint { @UriPath(description = "The Knative type") @@ -118,6 +117,9 @@ public Producer createProducer() throws Exception { headers.putIfAbsent("CE-EventTime", eventTime); headers.putIfAbsent("CE-Source", getEndpointUri()); headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType); + + // Always remove host so it's always computed from the URL and not inherited from the exchange + headers.remove("Host"); }, endpoint.createProducer() ); @@ -235,6 +237,11 @@ private static Endpoint http(CamelContext context, ServiceDefinition definition) ); } + uri = URISupport.appendParametersToURI( + uri, + CollectionHelper.mapOf("useRelativePath", "true") + ); + return context.getEndpoint(uri); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services