This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
The following commit(s) were added to refs/heads/master by this push: new 654b2dd fix: add custom netty http binding to workaround CAMEL-13351 654b2dd is described below commit 654b2dd3c649cf1f104ae7efbe4be38529d18051 Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Wed Mar 20 14:47:31 2019 +0100 fix: add custom netty http binding to workaround CAMEL-13351 The issue will be fixed upstream by the #2832 on apache camel repository but as we support multiple camel version we still need to have this workaround to be sure the fix is applied. --- .../java/org/apache/camel/k/adapter/Objects.java | 27 ++++ .../java/org/apache/camel/k/adapter/Objects.java | 27 ++++ .../knative/http/KnativeHttpComponent.java | 170 +++++++++++++++++++-- 3 files changed, 212 insertions(+), 12 deletions(-) diff --git a/camel-k-adapter-camel-2/src/main/java/org/apache/camel/k/adapter/Objects.java b/camel-k-adapter-camel-2/src/main/java/org/apache/camel/k/adapter/Objects.java new file mode 100644 index 0000000..00e0bc5 --- /dev/null +++ b/camel-k-adapter-camel-2/src/main/java/org/apache/camel/k/adapter/Objects.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.k.adapter; + +import java.util.Iterator; + +import org.apache.camel.util.ObjectHelper; + +public final class Objects { + public static Iterator<?> createIterator(Object value, String delimiter, boolean allowEmptyValues) { + return ObjectHelper.createIterator(value, delimiter, allowEmptyValues); + } +} diff --git a/camel-k-adapter-camel-3/src/main/java/org/apache/camel/k/adapter/Objects.java b/camel-k-adapter-camel-3/src/main/java/org/apache/camel/k/adapter/Objects.java new file mode 100644 index 0000000..1911e4e --- /dev/null +++ b/camel-k-adapter-camel-3/src/main/java/org/apache/camel/k/adapter/Objects.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.k.adapter; + +import java.util.Iterator; + +import org.apache.camel.support.ObjectHelper; + +public final class Objects { + public static Iterator<?> createIterator(Object value, String delimiter, boolean allowEmptyValues) { + return ObjectHelper.createIterator(value, delimiter, allowEmptyValues); + } +} diff --git a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java index d0567c7..301c181 100644 --- a/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java +++ b/camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java @@ -18,29 +18,47 @@ package org.apache.camel.component.knative.http; import java.net.URI; import java.nio.channels.ClosedChannelException; +import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpVersion; import io.netty.util.Attribute; import io.netty.util.AttributeKey; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.TypeConverter; +import org.apache.camel.component.netty4.NettyConverter; +import org.apache.camel.component.netty4.http.DefaultNettyHttpBinding; import org.apache.camel.component.netty4.http.HttpServerConsumerChannelFactory; +import org.apache.camel.component.netty4.http.NettyHttpBinding; import org.apache.camel.component.netty4.http.NettyHttpComponent; +import org.apache.camel.component.netty4.http.NettyHttpConfiguration; import org.apache.camel.component.netty4.http.NettyHttpConsumer; +import org.apache.camel.component.netty4.http.NettyHttpHelper; import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler; import org.apache.camel.http.common.CamelServlet; import org.apache.camel.k.adapter.Exceptions; +import org.apache.camel.k.adapter.Objects; import org.apache.camel.k.adapter.Services; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.support.RestConsumerContextPathMatcher; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; @@ -53,8 +71,14 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; public class KnativeHttpComponent extends NettyHttpComponent { + private static final Logger LOGGER = LoggerFactory.getLogger(KnativeHttpComponent.class); private final Map<Integer, HttpServerConsumerChannelFactory> handlers = new ConcurrentHashMap<>(); + public KnativeHttpComponent() { + super(); + setNettyHttpBinding(new KnativeNettyHttpBinding(getHeaderFilterStrategy())); + } + @Override public synchronized HttpServerConsumerChannelFactory getMultiplexChannelHandler(int port) { return handlers.computeIfAbsent(port, Handler::new); @@ -68,6 +92,11 @@ public class KnativeHttpComponent extends NettyHttpComponent { handlers.clear(); } + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + return super.createEndpoint(uri, remaining, parameters); + } + @ChannelHandler.Sharable private static class Handler extends SimpleChannelInboundHandler<Object> implements HttpServerConsumerChannelFactory { private static final Logger LOG = LoggerFactory.getLogger(Handler.class); @@ -190,18 +219,6 @@ public class KnativeHttpComponent extends NettyHttpComponent { // use the path as key to find the consumer handler to use path = pathAsKey(path); - /* - List<RestConsumerContextPathMatcher.ConsumerPath> paths = new ArrayList<>(); - for (final HttpServerChannelHandler handler : consumers) { - paths.add(new HttpRestConsumerPath(handler)); - } - - RestConsumerContextPathMatcher.ConsumerPath<HttpServerChannelHandler> best = RestConsumerContextPathMatcher.matchBestPath(method, path, paths); - if (best != null) { - answer = best.getConsumer(); - } - */ - // fallback to regular matching if (answer == null) { for (final HttpServerChannelHandler handler : consumers) { @@ -261,4 +278,133 @@ public class KnativeHttpComponent extends NettyHttpComponent { } } + + + /** + * Default {@link NettyHttpBinding}. + */ + public class KnativeNettyHttpBinding extends DefaultNettyHttpBinding { + public KnativeNettyHttpBinding(HeaderFilterStrategy headerFilterStrategy) { + super(headerFilterStrategy); + } + + @Override + public HttpRequest toNettyRequest(Message message, String uri, NettyHttpConfiguration configuration) throws Exception { + LOGGER.trace("toNettyRequest: {}", message); + + // the message body may already be a Netty HTTP response + if (message.getBody() instanceof HttpRequest) { + return (HttpRequest) message.getBody(); + } + + String uriForRequest = uri; + if (configuration.isUseRelativePath()) { + uriForRequest = URISupport.pathAndQueryOf(new URI(uriForRequest)); + } + + // just assume GET for now, we will later change that to the actual method to use + HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uriForRequest); + + Object body = message.getBody(); + if (body != null) { + // support bodies as native Netty + ByteBuf buffer; + if (body instanceof ByteBuf) { + buffer = (ByteBuf) body; + } else { + // try to convert to buffer first + buffer = message.getBody(ByteBuf.class); + if (buffer == null) { + // fallback to byte array as last resort + byte[] data = message.getMandatoryBody(byte[].class); + buffer = NettyConverter.toByteBuffer(data); + } + } + if (buffer != null) { + request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uriForRequest, buffer); + int len = buffer.readableBytes(); + // set content-length + request.headers().set(HttpHeaderNames.CONTENT_LENGTH.toString(), len); + LOGGER.trace("Content-Length: {}", len); + } else { + // we do not support this kind of body + throw new NoTypeConversionAvailableException(body, ByteBuf.class); + } + } + + // update HTTP method accordingly as we know if we have a body or not + HttpMethod method = NettyHttpHelper.createMethod(message, body != null); + request.setMethod(method); + + TypeConverter tc = message.getExchange().getContext().getTypeConverter(); + + // if we bridge endpoint then we need to skip matching headers with the HTTP_QUERY to avoid sending + // duplicated headers to the receiver, so use this skipRequestHeaders as the list of headers to skip + Map<String, Object> skipRequestHeaders = null; + if (configuration.isBridgeEndpoint()) { + String queryString = message.getHeader(Exchange.HTTP_QUERY, String.class); + if (queryString != null) { + skipRequestHeaders = URISupport.parseQuery(queryString, false, true); + } + // Need to remove the Host key as it should be not used + message.getHeaders().remove("host"); + } + + // append headers + // must use entrySet to ensure case of keys is preserved + for (Map.Entry<String, Object> entry : message.getHeaders().entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + + // we should not add headers for the parameters in the uri if we bridge the endpoint + // as then we would duplicate headers on both the endpoint uri, and in HTTP headers as well + if (skipRequestHeaders != null && skipRequestHeaders.containsKey(key)) { + continue; + } + + // use an iterator as there can be multiple values. (must not use a delimiter) + final Iterator<?> it = Objects.createIterator(value, null, true); + while (it.hasNext()) { + String headerValue = tc.convertTo(String.class, it.next()); + + if (headerValue != null && getHeaderFilterStrategy() != null + && !getHeaderFilterStrategy().applyFilterToCamelHeaders(key, headerValue, message.getExchange())) { + LOGGER.trace("HTTP-Header: {}={}", key, headerValue); + request.headers().add(key, headerValue); + } + } + } + + // set the content type in the response. + String contentType = message.getHeader(Exchange.CONTENT_TYPE, String.class); + if (contentType != null) { + // set content-type + request.headers().set(HttpHeaderNames.CONTENT_TYPE.toString(), contentType); + LOGGER.trace("Content-Type: {}", contentType); + } + + // must include HOST header as required by HTTP 1.1 + // use URI as its faster than URL (no DNS lookup) + URI u = new URI(uri); + String hostHeader = u.getHost() + (u.getPort() == 80 ? "" : ":" + u.getPort()); + request.headers().set(HttpHeaderNames.HOST.toString(), hostHeader); + LOGGER.trace("Host: {}", hostHeader); + + // configure connection to accordingly to keep alive configuration + // favor using the header from the message + String connection = message.getHeader(HttpHeaderNames.CONNECTION.toString(), String.class); + if (connection == null) { + // fallback and use the keep alive from the configuration + if (configuration.isKeepAlive()) { + connection = HttpHeaderValues.KEEP_ALIVE.toString(); + } else { + connection = HttpHeaderValues.CLOSE.toString(); + } + } + request.headers().set(HttpHeaderNames.CONNECTION.toString(), connection); + LOGGER.trace("Connection: {}", connection); + + return request; + } + } }