This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k.git

commit 1445a6f099268a28ffa10782c98ac87e21450bef
Author: nferraro <ni.ferr...@gmail.com>
AuthorDate: Mon Nov 26 17:07:58 2018 +0100

    Fix #218: allow to push to Knative
---
 pkg/trait/knative.go                               | 112 +++++++++++++++++----
 .../camel/component/knative/KnativeEndpoint.java   |   4 +-
 2 files changed, 97 insertions(+), 19 deletions(-)

diff --git a/pkg/trait/knative.go b/pkg/trait/knative.go
index e463746..b2cd958 100644
--- a/pkg/trait/knative.go
+++ b/pkg/trait/knative.go
@@ -20,6 +20,8 @@ package trait
 import (
        "encoding/json"
        "fmt"
+       "github.com/operator-framework/operator-sdk/pkg/sdk"
+       "github.com/pkg/errors"
        "strings"
 
        "github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
@@ -28,7 +30,6 @@ import (
        knativeutil "github.com/apache/camel-k/pkg/util/knative"
        eventing "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
        serving "github.com/knative/serving/pkg/apis/serving/v1alpha1"
-       "github.com/sirupsen/logrus"
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
@@ -36,6 +37,7 @@ import (
 type knativeTrait struct {
        BaseTrait `property:",squash"`
        Sources   string `property:"sources"`
+       Sinks     string `property:"sinks"`
 }
 
 func newKnativeTrait() *knativeTrait {
@@ -50,9 +52,13 @@ func (t *knativeTrait) appliesTo(e *Environment) bool {
 
 func (t *knativeTrait) autoconfigure(e *Environment) error {
        if t.Sources == "" {
-               channels := getSourceChannels(e)
+               channels := t.getSourceChannels(e)
                t.Sources = strings.Join(channels, ",")
        }
+       if t.Sinks == "" {
+               channels := t.getSinkChannels(e)
+               t.Sinks = strings.Join(channels, ",")
+       }
        return nil
 }
 
@@ -60,11 +66,15 @@ func (t *knativeTrait) apply(e *Environment) error {
        for _, sub := range t.getSubscriptionsFor(e) {
                e.Resources.Add(sub)
        }
-       e.Resources.Add(t.getServiceFor(e))
+       svc, err := t.getServiceFor(e)
+       if err != nil {
+               return err
+       }
+       e.Resources.Add(svc)
        return nil
 }
 
-func (t *knativeTrait) getServiceFor(e *Environment) *serving.Service {
+func (t *knativeTrait) getServiceFor(e *Environment) (*serving.Service, error) 
{
        // combine properties of integration with context, integration
        // properties have the priority
        properties := CombineConfigurationAsMap("property", e.Context, 
e.Integration)
@@ -102,7 +112,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) 
*serving.Service {
        environment["AB_JOLOKIA_OFF"] = "true"
 
        // Knative integration
-       environment["CAMEL_KNATIVE_CONFIGURATION"] = 
t.getConfigurationSerialized(e)
+       conf, err := t.getConfigurationSerialized(e)
+       if err != nil {
+               return nil, err
+       }
+       environment["CAMEL_KNATIVE_CONFIGURATION"] = conf
 
        labels := map[string]string{
                "camel.apache.org/integration": e.Integration.Name,
@@ -135,11 +149,11 @@ func (t *knativeTrait) getServiceFor(e *Environment) 
*serving.Service {
                },
        }
 
-       return &svc
+       return &svc, nil
 }
 
 func (t *knativeTrait) getSubscriptionsFor(e *Environment) 
[]*eventing.Subscription {
-       channels := getConfiguredSourceChannels(t.Sources)
+       channels := t.getConfiguredSourceChannels()
        subs := make([]*eventing.Subscription, 0)
        for _, ch := range channels {
                subs = append(subs, t.getSubscriptionFor(e, ch))
@@ -174,19 +188,19 @@ func (*knativeTrait) getSubscriptionFor(e *Environment, 
channel string) *eventin
        }
 }
 
-func (t *knativeTrait) getConfigurationSerialized(e *Environment) string {
-       env := t.getConfiguration(e)
+func (t *knativeTrait) getConfigurationSerialized(e *Environment) (string, 
error) {
+       env, err := t.getConfiguration(e)
        res, err := json.Marshal(env)
        if err != nil {
-               logrus.Warning("Unable to serialize Knative configuration", err)
-               return ""
+               return "", errors.Wrap(err, "unable to serialize Knative 
configuration")
        }
-       return string(res)
+       return string(res), nil
 }
 
-func (t *knativeTrait) getConfiguration(e *Environment) 
knativeutil.CamelEnvironment {
-       sourceChannels := getConfiguredSourceChannels(t.Sources)
+func (t *knativeTrait) getConfiguration(e *Environment) 
(knativeutil.CamelEnvironment, error) {
        env := knativeutil.NewCamelEnvironment()
+       // Sources
+       sourceChannels := t.getConfiguredSourceChannels()
        for _, ch := range sourceChannels {
                svc := knativeutil.CamelServiceDefinition{
                        Name:        ch,
@@ -200,6 +214,29 @@ func (t *knativeTrait) getConfiguration(e *Environment) 
knativeutil.CamelEnviron
                }
                env.Services = append(env.Services, svc)
        }
+       // Sinks
+       sinkChannels := t.getConfiguredSinkChannels()
+       for _, ch := range sinkChannels {
+               channel, err := t.retrieveChannel(e.Integration.Namespace, ch)
+               if err != nil {
+                       return env, err
+               }
+               hostname := channel.Status.Address.Hostname
+               if hostname == "" {
+                       return env, errors.New("cannot find address of channel 
" + ch)
+               }
+               svc := knativeutil.CamelServiceDefinition{
+                       Name:        ch,
+                       Host:        hostname,
+                       Port:        80,
+                       Protocol:    knativeutil.CamelProtocolHTTP,
+                       ServiceType: knativeutil.CamelServiceTypeChannel,
+                       Metadata: map[string]string{
+                               knativeutil.CamelMetaServicePath: "/",
+                       },
+               }
+               env.Services = append(env.Services, svc)
+       }
        // Adding default endpoint
        defSvc := knativeutil.CamelServiceDefinition{
                Name:        "default",
@@ -212,12 +249,12 @@ func (t *knativeTrait) getConfiguration(e *Environment) 
knativeutil.CamelEnviron
                },
        }
        env.Services = append(env.Services, defSvc)
-       return env
+       return env, nil
 }
 
-func getConfiguredSourceChannels(sources string) []string {
+func (t *knativeTrait) getConfiguredSourceChannels() []string {
        channels := make([]string, 0)
-       for _, ch := range strings.Split(sources, ",") {
+       for _, ch := range strings.Split(t.Sources, ",") {
                cht := strings.Trim(ch, " \t\"")
                if cht != "" {
                        channels = append(channels, cht)
@@ -226,7 +263,7 @@ func getConfiguredSourceChannels(sources string) []string {
        return channels
 }
 
-func getSourceChannels(e *Environment) []string {
+func (*knativeTrait) getSourceChannels(e *Environment) []string {
        channels := make([]string, 0)
 
        metadata.Each(e.Integration.Spec.Sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
@@ -236,3 +273,42 @@ func getSourceChannels(e *Environment) []string {
 
        return channels
 }
+
+func (t *knativeTrait) getConfiguredSinkChannels() []string {
+       channels := make([]string, 0)
+       for _, ch := range strings.Split(t.Sinks, ",") {
+               cht := strings.Trim(ch, " \t\"")
+               if cht != "" {
+                       channels = append(channels, cht)
+               }
+       }
+       return channels
+}
+
+func (*knativeTrait) getSinkChannels(e *Environment) []string {
+       channels := make([]string, 0)
+
+       metadata.Each(e.Integration.Spec.Sources, func(_ int, meta 
metadata.IntegrationMetadata) bool {
+               channels = append(channels, 
knativeutil.ExtractChannelNames(meta.ToURIs)...)
+               return true
+       })
+
+       return channels
+}
+
+func (*knativeTrait) retrieveChannel(namespace string, name string) 
(*eventing.Channel, error) {
+       channel := eventing.Channel{
+               TypeMeta: metav1.TypeMeta{
+                       Kind:       "Channel",
+                       APIVersion: eventing.SchemeGroupVersion.String(),
+               },
+               ObjectMeta: metav1.ObjectMeta{
+                       Namespace: namespace,
+                       Name:      name,
+               },
+       }
+       if err := sdk.Get(&channel); err != nil {
+               return nil, errors.Wrap(err, "could not retrieve channel " + 
name + " in namespace " + namespace)
+       }
+       return &channel, nil
+}
diff --git 
a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
 
b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
index eb3a1fd..c1b92cd 100644
--- 
a/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
+++ 
b/runtime/camel-knative/src/main/java/org/apache/camel/component/knative/KnativeEndpoint.java
@@ -52,7 +52,6 @@ import static org.apache.camel.util.ObjectHelper.ifNotEmpty;
     scheme = "knative",
     syntax = "knative:type/target",
     title = "Knative",
-    producerOnly = true,
     label = "cloud,eventing")
 public class KnativeEndpoint extends DefaultEndpoint implements 
DelegateEndpoint {
     @UriPath(description = "The Knative type")
@@ -118,6 +117,9 @@ public class KnativeEndpoint extends DefaultEndpoint 
implements DelegateEndpoint
                 headers.putIfAbsent("CE-EventTime", eventTime);
                 headers.putIfAbsent("CE-Source", getEndpointUri());
                 headers.putIfAbsent(Exchange.CONTENT_TYPE, contentType);
+
+                // Always remove host so it's always computed from the URL and 
not inherited from the exchange
+                headers.remove("Host");
             },
             endpoint.createProducer()
         );

Reply via email to