CAMEL-6555 Fixed the UDP related test errors
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/706d1b44 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/706d1b44 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/706d1b44 Branch: refs/heads/master Commit: 706d1b44bcad170508fa31e55d8b3ed4e4ef0bbd Parents: 5022509 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Tue Jul 22 21:22:32 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Tue Jul 22 21:25:20 2014 +0800 ---------------------------------------------------------------------- .../netty4/ChannelHandlerFactories.java | 92 +++++++++++++++----- .../component/netty4/NettyConfiguration.java | 16 ++-- .../camel/component/netty4/NettyHelper.java | 12 +-- .../component/netty4/NettyPayloadHelper.java | 18 ++++ .../camel/component/netty4/NettyProducer.java | 7 +- .../netty4/codec/DatagramPacketDecoder.java | 40 +++++++++ .../codec/DatagramPacketDelimiterDecoder.java | 47 ++++++++++ .../netty4/codec/DatagramPacketEncoder.java | 42 +++++++++ .../codec/DatagramPacketObjectDecoder.java | 51 +++++++++++ .../codec/DatagramPacketObjectEncoder.java | 51 +++++++++++ .../codec/DatagramPacketStringDecoder.java | 62 +++++++++++++ .../codec/DatagramPacketStringEncoder.java | 69 +++++++++++++++ .../codec/DelimiterBasedFrameDecoder.java | 34 ++++++++ .../component/netty4/codec/ObjectDecoder.java | 38 ++++++++ .../component/netty4/codec/ObjectEncoder.java | 34 ++++++++ .../netty4/handlers/ServerChannelHandler.java | 4 +- .../camel/component/netty4/MyCustomCodec.java | 2 +- .../netty4/NettyManualEndpointTest.java | 6 +- 18 files changed, 585 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java index 65d1162..edd387f 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java @@ -27,6 +27,13 @@ import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; +import org.apache.camel.component.netty4.codec.DatagramPacketDecoder; +import org.apache.camel.component.netty4.codec.DatagramPacketDelimiterDecoder; +import org.apache.camel.component.netty4.codec.DatagramPacketEncoder; +import org.apache.camel.component.netty4.codec.DatagramPacketObjectDecoder; +import org.apache.camel.component.netty4.codec.DatagramPacketObjectEncoder; +import org.apache.camel.component.netty4.codec.DatagramPacketStringDecoder; +import org.apache.camel.component.netty4.codec.DatagramPacketStringEncoder; @@ -38,34 +45,73 @@ public final class ChannelHandlerFactories { private ChannelHandlerFactories() { } - public static ChannelHandlerFactory newStringEncoder(Charset charset) { - return new ShareableChannelHandlerFactory(new StringEncoder(charset)); + public static ChannelHandlerFactory newStringEncoder(Charset charset, String protocol) { + if ("udp".equalsIgnoreCase(protocol)) { + return new ShareableChannelHandlerFactory(new DatagramPacketStringEncoder(charset)); + } else { + return new ShareableChannelHandlerFactory(new StringEncoder(charset)); + } } - public static ChannelHandlerFactory newStringDecoder(Charset charset) { - return new ShareableChannelHandlerFactory(new StringDecoder(charset)); + public static ChannelHandlerFactory newStringDecoder(Charset charset, String protocol) { + if ("udp".equalsIgnoreCase(protocol)) { + return new ShareableChannelHandlerFactory(new DatagramPacketStringDecoder(charset)); + } else { + return new ShareableChannelHandlerFactory(new StringDecoder(charset)); + } } - - public static ChannelHandlerFactory newObjectDecoder() { - return new DefaultChannelHandlerFactory() { - @Override - public ChannelHandler newChannelHandler() { - return new ObjectDecoder(ClassResolvers.weakCachingResolver(null)); - } - }; + + + public static ChannelHandlerFactory newObjectDecoder(String protocol) { + if ("udp".equalsIgnoreCase(protocol)) { + return new DefaultChannelHandlerFactory() { + @Override + public ChannelHandler newChannelHandler() { + return new DatagramPacketObjectDecoder(ClassResolvers.weakCachingResolver(null)); + } + }; + } else { + return new DefaultChannelHandlerFactory() { + @Override + public ChannelHandler newChannelHandler() { + return new ObjectDecoder(ClassResolvers.weakCachingResolver(null)); + } + }; + } } - - public static ChannelHandlerFactory newObjectEncoder() { - return new ShareableChannelHandlerFactory(new ObjectEncoder()); + + public static ChannelHandlerFactory newObjectEncoder(String protocol) { + if ("udp".equals(protocol)) { + return new ShareableChannelHandlerFactory(new DatagramPacketObjectEncoder()); + } else { + return new ShareableChannelHandlerFactory(new ObjectEncoder()); + } } - - public static ChannelHandlerFactory newDelimiterBasedFrameDecoder(final int maxFrameLength, final ByteBuf[] delimiters) { - return new DefaultChannelHandlerFactory() { - @Override - public ChannelHandler newChannelHandler() { - return new DelimiterBasedFrameDecoder(maxFrameLength, true, delimiters); - } - }; + + public static ChannelHandlerFactory newDelimiterBasedFrameDecoder(final int maxFrameLength, final ByteBuf[] delimiters, String protocol) { + if ("udp".equals(protocol)) { + return new DefaultChannelHandlerFactory() { + @Override + public ChannelHandler newChannelHandler() { + return new DatagramPacketDelimiterDecoder(maxFrameLength, delimiters); + } + }; + } else { + return new DefaultChannelHandlerFactory() { + @Override + public ChannelHandler newChannelHandler() { + return new DelimiterBasedFrameDecoder(maxFrameLength, true, delimiters); + } + }; + } + } + + public static ChannelHandlerFactory newDatagramPacketDecoder() { + return new ShareableChannelHandlerFactory(new DatagramPacketDecoder()); + } + + public static ChannelHandlerFactory newDatagramPacketEncoder() { + return new ShareableChannelHandlerFactory(new DatagramPacketEncoder()); } public static ChannelHandlerFactory newLengthFieldBasedFrameDecoder(final int maxFrameLength, final int lengthFieldOffset, http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java index 47b8af5..ffdc77d 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java @@ -189,13 +189,16 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem // add default encoders and decoders if (encoders.isEmpty() && decoders.isEmpty()) { if (allowDefaultCodec) { + if ("udp".equalsIgnoreCase(protocol)) { + encoders.add(ChannelHandlerFactories.newDatagramPacketEncoder()); + } // are we textline or object? if (isTextline()) { Charset charset = getEncoding() != null ? Charset.forName(getEncoding()) : CharsetUtil.UTF_8; - encoders.add(ChannelHandlerFactories.newStringEncoder(charset)); + encoders.add(ChannelHandlerFactories.newStringEncoder(charset, protocol)); ByteBuf[] delimiters = delimiter == TextLineDelimiter.LINE ? Delimiters.lineDelimiter() : Delimiters.nulDelimiter(); - decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(decoderMaxLineLength, delimiters)); - decoders.add(ChannelHandlerFactories.newStringDecoder(charset)); + decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(decoderMaxLineLength, delimiters, protocol)); + decoders.add(ChannelHandlerFactories.newStringDecoder(charset, protocol)); if (LOG.isDebugEnabled()) { LOG.debug("Using textline encoders and decoders with charset: {}, delimiter: {} and decoderMaxLineLength: {}", @@ -203,11 +206,14 @@ public class NettyConfiguration extends NettyServerBootstrapConfiguration implem } } else { // object serializable is then used - encoders.add(ChannelHandlerFactories.newObjectEncoder()); - decoders.add(ChannelHandlerFactories.newObjectDecoder()); + encoders.add(ChannelHandlerFactories.newObjectEncoder(protocol)); + decoders.add(ChannelHandlerFactories.newObjectDecoder(protocol)); LOG.debug("Using object encoders and decoders"); } + if ("udp".equalsIgnoreCase(protocol)) { + decoders.add(ChannelHandlerFactories.newDatagramPacketDecoder()); + } } else { LOG.debug("No encoders and decoders will be used"); } http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java index 76f1f52..b345cca 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java @@ -16,12 +16,14 @@ */ package org.apache.camel.component.netty4; +import java.net.InetSocketAddress; import java.net.SocketAddress; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.DefaultAddressedEnvelope; + import org.apache.camel.Exchange; import org.apache.camel.NoTypeConversionAvailableException; import org.slf4j.Logger; @@ -91,10 +93,10 @@ public final class NettyHelper { if (log.isDebugEnabled()) { log.debug("Channel: {} remote address: {} writing body: {}", new Object[]{channel, remoteAddress, body}); } - System.out.println("The remote address is " + remoteAddress); - // TODO Do we need to setup the remoteAddress this time - //future = channel.write(body, remoteAddress); - future = channel.writeAndFlush(body); + // Need to create AddressedEnvelope to setup the address information here + DefaultAddressedEnvelope<Object, InetSocketAddress> ae = + new DefaultAddressedEnvelope<Object, InetSocketAddress>(body, (InetSocketAddress)remoteAddress); + future = channel.writeAndFlush(ae); } else { if (log.isDebugEnabled()) { log.debug("Channel: {} writing body: {}", new Object[]{channel, body}); http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java index dca12b2..f7deb21 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java @@ -16,6 +16,10 @@ */ package org.apache.camel.component.netty4; +import java.net.InetSocketAddress; + +import io.netty.channel.AddressedEnvelope; + import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultExchangeHolder; @@ -57,6 +61,13 @@ public final class NettyPayloadHelper { public static void setIn(Exchange exchange, Object payload) { if (payload instanceof DefaultExchangeHolder) { DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload); + } else if (payload instanceof AddressedEnvelope) { + @SuppressWarnings("unchecked") + AddressedEnvelope<Object, InetSocketAddress> dp = (AddressedEnvelope<Object, InetSocketAddress>)payload; + // need to take out the payload here + exchange.getIn().setBody(dp.content()); + // setup the sender address here for sending the response message back + exchange.setProperty(NettyConstants.NETTY_REMOTE_ADDRESS, dp.sender()); } else { // normal transfer using the body only exchange.getIn().setBody(payload); @@ -66,6 +77,13 @@ public final class NettyPayloadHelper { public static void setOut(Exchange exchange, Object payload) { if (payload instanceof DefaultExchangeHolder) { DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload); + } else if (payload instanceof AddressedEnvelope) { + @SuppressWarnings("unchecked") + AddressedEnvelope<Object, InetSocketAddress> dp = (AddressedEnvelope<Object, InetSocketAddress>)payload; + // need to take out the payload here + exchange.getOut().setBody(dp.content()); + // setup the sender address here for sending the response message back + exchange.setProperty(NettyConstants.NETTY_REMOTE_ADDRESS, dp.sender()); } else { // normal transfer using the body only and preserve the headers exchange.getOut().setHeaders(exchange.getIn().getHeaders()); http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java index 1082acd..c0e5360 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java @@ -228,9 +228,14 @@ public class NettyProducer extends DefaultAsyncProducer { // setup state as attachment on the channel, so we can access the state later when needed putState(channel, new NettyCamelState(producerCallback, exchange)); + // here we need to setup the remote address information here + InetSocketAddress remoteAddress = null; + if (!isTcp()) { + remoteAddress = new InetSocketAddress(configuration.getHost(), configuration.getPort()); + } // write body - NettyHelper.writeBodyAsync(LOG, channel, null, body, exchange, new ChannelFutureListener() { + NettyHelper.writeBodyAsync(LOG, channel, remoteAddress, body, exchange, new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { LOG.trace("Operation complete {}", channelFuture); if (!channelFuture.isSuccess()) { http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDecoder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDecoder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDecoder.java new file mode 100644 index 0000000..bd6d41f --- /dev/null +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDecoder.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.codec; + +import java.net.InetSocketAddress; +import java.util.List; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultAddressedEnvelope; +import io.netty.channel.socket.DatagramPacket; +import io.netty.handler.codec.MessageToMessageDecoder; + +@Sharable +public class DatagramPacketDecoder extends MessageToMessageDecoder<DatagramPacket> { + + @Override + protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception { + // decode the DatagramPackage to AddressedEnvelope + DefaultAddressedEnvelope<Object, InetSocketAddress> addressEvelop = + new DefaultAddressedEnvelope<Object, InetSocketAddress>(msg.content().retain(), msg.recipient(), msg.sender()); + out.add(addressEvelop); + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDelimiterDecoder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDelimiterDecoder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDelimiterDecoder.java new file mode 100644 index 0000000..5d67756 --- /dev/null +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDelimiterDecoder.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.codec; + +import java.net.InetSocketAddress; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultAddressedEnvelope; +import io.netty.handler.codec.MessageToMessageDecoder; + +public class DatagramPacketDelimiterDecoder extends MessageToMessageDecoder<AddressedEnvelope<Object, InetSocketAddress>> { + private final DelimiterBasedFrameDecoder delegateDecoder; + + public DatagramPacketDelimiterDecoder(int maxFrameLength, ByteBuf[] delimiters) { + delegateDecoder = new DelimiterBasedFrameDecoder(maxFrameLength, true, delimiters); + } + @Override + protected void decode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, + List<Object> out) throws Exception { + if (msg.content() instanceof ByteBuf) { + ByteBuf payload = (ByteBuf)msg.content(); + Object result = delegateDecoder.decode(ctx, payload); + AddressedEnvelope<Object, InetSocketAddress> addressEvelop = + new DefaultAddressedEnvelope<Object, InetSocketAddress>(result, msg.recipient(), msg.sender()); + out.add(addressEvelop); + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketEncoder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketEncoder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketEncoder.java new file mode 100644 index 0000000..c194763 --- /dev/null +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketEncoder.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.codec; + +import java.net.InetSocketAddress; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.socket.DatagramPacket; +import io.netty.handler.codec.MessageToMessageEncoder; + +@Sharable +public class DatagramPacketEncoder extends MessageToMessageEncoder<AddressedEnvelope<Object, InetSocketAddress>> { + + @Override + protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, List<Object> out) throws Exception { + if (msg.content() instanceof ByteBuf) { + ByteBuf payload = (ByteBuf)msg.content(); + // Just wrap the message as DatagramPacket, need to make sure the message content is ByteBuf + DatagramPacket dp = new DatagramPacket(payload.retain(), msg.recipient()); + out.add(dp); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectDecoder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectDecoder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectDecoder.java new file mode 100644 index 0000000..cd314ca --- /dev/null +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectDecoder.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.codec; + +import java.net.InetSocketAddress; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultAddressedEnvelope; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.serialization.ClassResolver; + + + +public class DatagramPacketObjectDecoder extends MessageToMessageDecoder<AddressedEnvelope<Object, InetSocketAddress>> { + + private final ObjectDecoder delegateDecoder; + + public DatagramPacketObjectDecoder(ClassResolver resolver) { + delegateDecoder = new ObjectDecoder(resolver); + } + + @Override + protected void decode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, + List<Object> out) throws Exception { + if (msg.content() instanceof ByteBuf) { + ByteBuf payload = (ByteBuf) msg.content(); + Object result = delegateDecoder.decode(ctx, payload); + AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = + new DefaultAddressedEnvelope<Object, InetSocketAddress>(result, msg.recipient(), msg.sender()); + out.add(addressedEnvelop); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectEncoder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectEncoder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectEncoder.java new file mode 100644 index 0000000..d57850e --- /dev/null +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectEncoder.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.codec; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultAddressedEnvelope; +import io.netty.handler.codec.MessageToMessageEncoder; + +@Sharable +public class DatagramPacketObjectEncoder extends + MessageToMessageEncoder<AddressedEnvelope<Object, InetSocketAddress>> { + private ObjectEncoder delegateObjectEncoder; + public DatagramPacketObjectEncoder() { + delegateObjectEncoder = new ObjectEncoder(); + } + @Override + protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, + List<Object> out) throws Exception { + if (msg.content() instanceof Serializable) { + Serializable payload = (Serializable) msg.content(); + ByteBuf buf = ctx.alloc().heapBuffer(); + delegateObjectEncoder.encode(ctx, payload, buf); + AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = + new DefaultAddressedEnvelope<Object, InetSocketAddress>(buf.retain(), msg.recipient(), msg.sender()); + out.add(addressedEnvelop); + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringDecoder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringDecoder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringDecoder.java new file mode 100644 index 0000000..cd63d4a --- /dev/null +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringDecoder.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.codec; + +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultAddressedEnvelope; +import io.netty.handler.codec.MessageToMessageDecoder; + +@Sharable +public class DatagramPacketStringDecoder extends MessageToMessageDecoder<AddressedEnvelope<Object, InetSocketAddress>> { + + private final Charset charset; + + /** + * Creates a new instance with the current system character set. + */ + public DatagramPacketStringDecoder() { + this(Charset.defaultCharset()); + } + + /** + * Creates a new instance with the specified character set. + */ + public DatagramPacketStringDecoder(Charset charset) { + if (charset == null) { + throw new NullPointerException("charset"); + } + this.charset = charset; + } + + @Override + protected void decode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, List<Object> out) throws Exception { + if (msg.content() instanceof ByteBuf) { + ByteBuf payload = (ByteBuf)msg.content(); + AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = + new DefaultAddressedEnvelope<Object, InetSocketAddress>(payload.toString(charset), msg.recipient(), msg.sender()); + out.add(addressedEnvelop); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringEncoder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringEncoder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringEncoder.java new file mode 100644 index 0000000..539056e --- /dev/null +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringEncoder.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.netty4.codec; + +import java.net.InetSocketAddress; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.util.List; + +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.AddressedEnvelope; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultAddressedEnvelope; +import io.netty.handler.codec.MessageToMessageEncoder; + +@Sharable +public class DatagramPacketStringEncoder extends + MessageToMessageEncoder<AddressedEnvelope<Object, InetSocketAddress>> { + + private final Charset charset; + + /** + * Creates a new instance with the current system character set. + */ + public DatagramPacketStringEncoder() { + this(Charset.defaultCharset()); + } + + /** + * Creates a new instance with the specified character set. + */ + public DatagramPacketStringEncoder(Charset charset) { + if (charset == null) { + throw new NullPointerException("charset"); + } + this.charset = charset; + } + + @Override + protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<Object, InetSocketAddress> msg, + List<Object> out) throws Exception { + if (msg.content() instanceof CharSequence) { + CharSequence payload = (CharSequence)msg.content(); + if (payload.length() == 0) { + return; + } + AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = + new DefaultAddressedEnvelope<Object, InetSocketAddress>(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(payload), charset), msg.recipient(), msg.sender()); + out.add(addressedEnvelop); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DelimiterBasedFrameDecoder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DelimiterBasedFrameDecoder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DelimiterBasedFrameDecoder.java new file mode 100644 index 0000000..d6ededf --- /dev/null +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DelimiterBasedFrameDecoder.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +public class DelimiterBasedFrameDecoder extends io.netty.handler.codec.DelimiterBasedFrameDecoder { + + public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, ByteBuf[] delimiters) { + super(maxFrameLength, stripDelimiter, delimiters); + } + + @Override + public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + return super.decode(ctx, in); + } + + +} http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectDecoder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectDecoder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectDecoder.java new file mode 100644 index 0000000..c414a4e --- /dev/null +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectDecoder.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.codec; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.serialization.ClassResolver; + + +/** + * Just expose the decode method for DatagramPacketObjectDecoder to use + */ +public class ObjectDecoder extends io.netty.handler.codec.serialization.ObjectDecoder { + + public ObjectDecoder(ClassResolver classResolver) { + super(classResolver); + } + + + public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + return super.decode(ctx, in); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectEncoder.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectEncoder.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectEncoder.java new file mode 100644 index 0000000..c30042a --- /dev/null +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectEncoder.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.netty4.codec; + +import java.io.Serializable; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + + +/** + * Just expose the encode method for DatagramPacketObjectEncoder to use + */ +public class ObjectEncoder extends io.netty.handler.codec.serialization.ObjectEncoder { + @Override + public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception { + super.encode(ctx, msg, out); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java index 47dce7b..a03a99e 100644 --- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java @@ -21,10 +21,10 @@ import java.net.SocketAddress; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; - import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.component.netty4.NettyConstants; import org.apache.camel.component.netty4.NettyConsumer; import org.apache.camel.component.netty4.NettyHelper; import org.apache.camel.component.netty4.NettyPayloadHelper; @@ -172,7 +172,7 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { if (consumer.getConfiguration().isTcp()) { NettyHelper.writeBodyAsync(LOG, ctx.channel(), null, body, exchange, listener); } else { - NettyHelper.writeBodyAsync(LOG, ctx.channel(), ctx.channel().remoteAddress(), body, exchange, listener); + NettyHelper.writeBodyAsync(LOG, ctx.channel(), exchange.getProperty(NettyConstants.NETTY_REMOTE_ADDRESS, SocketAddress.class), body, exchange, listener); } } } http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java index 4fe97de..be8e431 100644 --- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java @@ -35,7 +35,7 @@ public final class MyCustomCodec { public static ChannelHandlerFactory createMyCustomDecoder() { ByteBuf[] delimiters = new ByteBuf[]{nullDelimiter, nullDelimiter}; - return ChannelHandlerFactories.newDelimiterBasedFrameDecoder(4096, delimiters); + return ChannelHandlerFactories.newDelimiterBasedFrameDecoder(4096, delimiters, "tcp"); } public static ChannelHandler createMyCustomDecoder2() { http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyManualEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyManualEndpointTest.java b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyManualEndpointTest.java index 4707a61..6ed15eb 100644 --- a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyManualEndpointTest.java +++ b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyManualEndpointTest.java @@ -55,10 +55,10 @@ public class NettyManualEndpointTest extends BaseNettyTest { nettyConfig.setSync(false); // need to add encoders and decoders manually - nettyConfig.setEncoder(ChannelHandlerFactories.newStringEncoder(CharsetUtil.UTF_8)); + nettyConfig.setEncoder(ChannelHandlerFactories.newStringEncoder(CharsetUtil.UTF_8, "tcp")); List<ChannelHandler> decoders = new ArrayList<ChannelHandler>(); - decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(1000, Delimiters.lineDelimiter())); - decoders.add(ChannelHandlerFactories.newStringDecoder(CharsetUtil.UTF_8)); + decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(1000, Delimiters.lineDelimiter(), "tcp")); + decoders.add(ChannelHandlerFactories.newStringDecoder(CharsetUtil.UTF_8, "tcp")); nettyConfig.setDecoders(decoders); // create and start component