lburgazzoli closed pull request #247: Allow to push to Knative channels
URL: https://github.com/apache/camel-k/pull/247
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/gke-setup.adoc b/docs/gke-setup.adoc
index 5aa35aa2..5bc39ebf 100644
--- a/docs/gke-setup.adoc
+++ b/docs/gke-setup.adoc
@@ -1,24 +1,24 @@
 [[gke-cluster]]
 Configuring a Google Kubernetes Engine (GKE) Cluster
-==============================
+====================================================
 
-This guide assumes you've already create a Kubernetes Engine cluster on 
https://console.cloud.google.com.
+This guide assumes you've already created a Kubernetes Engine cluster on 
https://console.cloud.google.com.
 
 Make sure you've selected a version of Kubernetes greater than **1.11** when 
creating the cluster. You can create it in any region.
 
-In the list of clusters for the current project, GKE provides a connection 
script that you need to execute on a shell to configure the `kubectl` command.
+In the list of clusters for the current project, GKE provides a connection 
string that you need to execute on a shell to configure the `kubectl` command.
 
-NOTE: the script contains a `--project` flag that indicates your **project 
ID**. You should keep that information for the last step.
+NOTE: the connection string contains a `--project` flag that indicates your 
**project ID**. You should keep that information for the last step.
 
-After executing the connection script, if everything is installed correctly, 
you should be able to execute:
+After executing the connection string, if everything is installed correctly, 
you should be able to execute:
 
 ```
 kubectl get pod
 ```
 
-When the cluster is first installed, you should find that no pods are present 
in the cluster. You can proceed with the installation then.
+When the cluster is first installed, you should find that "no pods are 
present" in the cluster. You can proceed with the installation then.
 
-Before installing Camel K on a fresh GKE cluster, you need to perform a extra 
step to give to your account the required cluster-admin permissions.
+Before installing Camel K on a fresh GKE cluster, you need to perform some 
extra steps to give to your account the required cluster-admin permissions.
 This means executing the following command (**replacing 
"your-addr...@gmail.com" with your account email address**):
 
 ```
@@ -36,25 +36,27 @@ The best way to obtain a valid key is from the web console:
 - To avoid confusion, it's suggested to use the "English" language in 
preferences of the Google Cloud console
 - Select "IAM & admin" from the navigation menu, then "Service accounts"
 - Create a new service account specifying the following id: 
**"camel-k-builder"**
-- You'll be asked to select a role. It's important to **select the **"Storage 
Admin" role** from the "Storage" menu
+- You'll be asked to select a role. It's important to select the **"Storage 
Admin" role** from the "Storage" menu
 - Finish creating the service account
 - From the action menu of the service account you've created, **create a key** 
using the JSON format
 
 A `.json` file with the key will be downloaded to your machine. You need to 
store that key in a Kubernetes secret.
 
 It's **important** to rename the file you've just downloaded to 
`kaniko-secret.json` (make sure you write it correctly).
-After the renaming, execute the following command:
+After the renaming, execute the following command to create the secret:
 
 ```
 kubectl create secret generic kaniko-secret --from-file=kaniko-secret.json
 ```
 
-You're ready to install Camel K. You should execute the following command to 
install it correctly:
+You're ready to install Camel K. You should now execute the following command 
to install cluster resources and the operator (in the current namespace):
 
 ```
 kamel install --registry gcr.io --organization <<your-project-id>> 
--push-secret kaniko-secret
 ```
 
-Use the project id that you've annotated when issuing the first connection 
string. Note: the project id is **NOT** the cluster id!
+Use the project id that you've annotated when executing the first connection 
string.
+
+NOTE: the project id is **NOT** the cluster id!
 
 You're now ready to play with Camel K!
diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index e4637460..b2cd9585 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -20,6 +20,8 @@ package trait
 import (
        "encoding/json"
        "fmt"
+       "github.com/operator-framework/operator-sdk/pkg/sdk"
+       "github.com/pkg/errors"
        "strings"
 
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -28,7 +30,6 @@ import (
        knativeutil "github.com/apache/camel-k/pkg/util/knative"
        eventing "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
        serving "github.com/knative/serving/pkg/apis/serving/v1alpha1"
-       "github.com/sirupsen/logrus"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
@@ -36,6 +37,7 @@ import (
 type knativeTrait struct {
        BaseTrait `property:",squash"`
        Sources   string `property:"sources"`
+       Sinks     string `property:"sinks"`
 }
 
 func newKnativeTrait() *knativeTrait {
@@ -50,9 +52,13 @@ func (t *knativeTrait) appliesTo(e *Environment) bool {
 
 func (t *knativeTrait) autoconfigure(e *Environment) error {
        if t.Sources == "" {
-               channels := getSourceChannels(e)
+               channels := t.getSourceChannels(e)
                t.Sources = strings.Join(channels, ",")
        }
+       if t.Sinks == "" {
+               channels := t.getSinkChannels(e)
+               t.Sinks = strings.Join(channels, ",")
+       }
        return nil
 }
 
@@ -60,11 +66,15 @@ func (t *knativeTrait) apply(e *Environment) error {
        for _, sub := range t.getSubscriptionsFor(e) {
                e.Resources.Add(sub)
        }
-       e.Resources.Add(t.getServiceFor(e))
+       svc, err := t.getServiceFor(e)
+       if err != nil {
+               return err
+       }
+       e.Resources.Add(svc)
        return nil
 }
 
-func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service {
+func (t *knativeTrait) getServiceFor(e *Environment) (*serving.Service, error) 
{
        // combine properties of integration with context, integration
        // properties have the priority
        properties := CombineConfigurationAsMap("property", e.Context, 
e.Integration)
@@ -102,7 +112,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) 
*serving.Service {
        environment["AB_JOLOKIA_OFF"] = "true"
 
        // Knative integration
-       environment["CAMEL_KNATIVE_CONFIGURATION"] = 
t.getConfigurationSerialized(e)
+       conf, err := t.getConfigurationSerialized(e)
+       if err != nil {
+               return nil, err
+       }
+       environment["CAMEL_KNATIVE_CONFIGURATION"] = conf
 
        labels := map[string]string{
                "camel.apache.org/integration": e.Integration.Name,
@@ -135,11 +149,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) 
*serving.Service {
                },
        }
 
-       return &svc
+       return &svc, nil
 }
 
 func (t *knativeTrait) getSubscriptionsFor(e *Environment) 
[]*eventing.Subscription {
-       channels := getConfiguredSourceChannels(t.Sources)
+       channels := t.getConfiguredSourceChannels()
        subs := make([]*eventing.Subscription, 0)
        for _, ch := range channels {
                subs = append(subs, t.getSubscriptionFor(e, ch))
@@ -174,19 +188,19 @@ func (*knativeTrait) getSubscriptionFor(e *Environment, 
channel string) *eventin
        }
 }
 
-func (t *knativeTrait) getConfigurationSerialized(e *Environment) string {
-       env := t.getConfiguration(e)
+func (t *knativeTrait) getConfigurationSerialized(e *Environment) (string, 
error) {
+       env, err := t.getConfiguration(e)
        res, err := json.Marshal(env)
        if err != nil {
-               logrus.Warning("Unable to serialize Knative configuration", err)
-               return ""
+               return "", errors.Wrap(err, "unable to serialize Knative 
configuration")
        }
-       return string(res)
+       return string(res), nil
 }
 
-func (t *knativeTrait) getConfiguration(e *Environment) 
knativeutil.CamelEnvironment {
-       sourceChannels := getConfiguredSourceChannels(t.Sources)
+func (t *knativeTrait) getConfiguration(e *Environment) 
(knativeutil.CamelEnvironment, error) {
        env := knativeutil.NewCamelEnvironment()
+       // Sources
+       sourceChannels := t.getConfiguredSourceChannels()
        for _, ch := range sourceChannels {
                svc := knativeutil.CamelServiceDefinition{
                        Name:        ch,
@@ -200,6 +214,29 @@ func (t *knativeTrait) getConfiguration(e *Environment) 
knativeutil.CamelEnviron
                }
                env.Services = append(env.Services, svc)
        }
+       // Sinks
+       sinkChannels := t.getConfiguredSinkChannels()
+       for _, ch := range sinkChannels {
+               channel, err := t.retrieveChannel(e.Integration.Namespace, ch)
+               if err != nil {
+                       return env, err
+               }
+               hostname := channel.Status.Address.Hostname
+               if hostname == "" {
+                       return env, errors.New("cannot find address of channel 
" + ch)
+               }
+               svc := knativeutil.CamelServiceDefinition{
+                       Name:        ch,
+                       Host:        hostname,
+                       Port:        80,
+                       Protocol:    knativeutil.CamelProtocolHTTP,
+                       ServiceType: knativeutil.CamelServiceTypeChannel,
+                       Metadata: map[string]string{
+                               knativeutil.CamelMetaServicePath: "/",
+                       },
+               }
+               env.Services = append(env.Services, svc)
+       }
        // Adding default endpoint
        defSvc := knativeutil.CamelServiceDefinition{
                Name:        "default",
@@ -212,12 +249,12 @@ func (t *knativeTrait) getConfiguration(e *Environment) 
knativeutil.CamelEnviron
                },
        }
        env.Services = append(env.Services, defSvc)
-       return env
+       return env, nil
 }
 
-func getConfiguredSourceChannels(sources string) []string {
+func (t *knativeTrait) getConfiguredSourceChannels() []string {
        channels := make([]string, 0)
-       for _, ch := range strings.Split(sources, ",") {
+       for _, ch := range strings.Split(t.Sources, ",") {
                cht := strings.Trim(ch, " \t\"")
                if cht != "" {
                        channels = append(channels, cht)
@@ -226,7 +263,7 @@ func getConfiguredSourceChannels(sources string) []string {
        return channels
 }
 
-func getSourceChannels(e *Environment) []string {
+func (*knativeTrait) getSourceChannels(e *Environment) []string {
        channels := make([]string, 0)
 
        metadata.Each(e.Integration.Spec.Sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
@@ -236,3 +273,42 @@ func getSourceChannels(e *Environment) []string {
 
        return channels
 }
+
+func (t *knativeTrait) getConfiguredSinkChannels() []string {
+       channels := make([]string, 0)
+       for _, ch := range strings.Split(t.Sinks, ",") {
+               cht := strings.Trim(ch, " \t\"")
+               if cht != "" {
+                       channels = append(channels, cht)
+               }
+       }
+       return channels
+}
+
+func (*knativeTrait) getSinkChannels(e *Environment) []string {
+       channels := make([]string, 0)
+
+       metadata.Each(e.Integration.Spec.Sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
+               channels = append(channels, 
knativeutil.ExtractChannelNames(meta.ToURIs)...)
+               return true
+       })
+
+       return channels
+}
+
+func (*knativeTrait) retrieveChannel(namespace string, name string) 
(*eventing.Channel, error) {
+       channel := eventing.Channel{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Channel",
+                       APIVersion: eventing.SchemeGroupVersion.String(),
+               },
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: namespace,
+                       Name:      name,
+               },
+       }
+       if err := sdk.Get(&channel); err != nil {
+               return nil, errors.Wrap(err, "could not retrieve channel " + 
name + " in namespace " + namespace)
+       }
+       return &channel, nil
+}
diff --git 
a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
 
b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index eb3a1fdf..ce50bebf 100644
--- 
a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ 
b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -52,7 +52,6 @@
     scheme = "knative",
     syntax = "knative:type/target",
     title = "Knative",
-    producerOnly = true,
     label = "cloud,eventing")
 public class KnativeEndpoint extends DefaultEndpoint implements 
DelegateEndpoint {
     @UriPath(description = "The Knative type")
@@ -118,6 +117,9 @@ public Producer createProducer() throws Exception {
                 headers.putIfAbsent("CE-EventTime", eventTime);
                 headers.putIfAbsent("CE-Source", getEndpointUri());
                 headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+
+                // Always remove host so it's always computed from the URL and 
not inherited from the exchange
+                headers.remove("Host");
             },
             endpoint.createProducer()
         );
@@ -235,6 +237,11 @@ private static Endpoint http(CamelContext context, 
ServiceDefinition definition)
                 );
             }
 
+            uri = URISupport.appendParametersToURI(
+                    uri,
+                    CollectionHelper.mapOf("useRelativePath", "true")
+            );
+
             return context.getEndpoint(uri);
         } catch (Exception e) {
             throw ObjectHelper.wrapRuntimeCamelException(e);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to