CAMEL-6555 Fixed the UDP related test errors

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/706d1b44
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/706d1b44
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/706d1b44

Branch: refs/heads/master
Commit: 706d1b44bcad170508fa31e55d8b3ed4e4ef0bbd
Parents: 5022509
Author: Willem Jiang <willem.ji...@gmail.com>
Authored: Tue Jul 22 21:22:32 2014 +0800
Committer: Willem Jiang <willem.ji...@gmail.com>
Committed: Tue Jul 22 21:25:20 2014 +0800

----------------------------------------------------------------------
 .../netty4/ChannelHandlerFactories.java         | 92 +++++++++++++++-----
 .../component/netty4/NettyConfiguration.java    | 16 ++--
 .../camel/component/netty4/NettyHelper.java     | 12 +--
 .../component/netty4/NettyPayloadHelper.java    | 18 ++++
 .../camel/component/netty4/NettyProducer.java   |  7 +-
 .../netty4/codec/DatagramPacketDecoder.java     | 40 +++++++++
 .../codec/DatagramPacketDelimiterDecoder.java   | 47 ++++++++++
 .../netty4/codec/DatagramPacketEncoder.java     | 42 +++++++++
 .../codec/DatagramPacketObjectDecoder.java      | 51 +++++++++++
 .../codec/DatagramPacketObjectEncoder.java      | 51 +++++++++++
 .../codec/DatagramPacketStringDecoder.java      | 62 +++++++++++++
 .../codec/DatagramPacketStringEncoder.java      | 69 +++++++++++++++
 .../codec/DelimiterBasedFrameDecoder.java       | 34 ++++++++
 .../component/netty4/codec/ObjectDecoder.java   | 38 ++++++++
 .../component/netty4/codec/ObjectEncoder.java   | 34 ++++++++
 .../netty4/handlers/ServerChannelHandler.java   |  4 +-
 .../camel/component/netty4/MyCustomCodec.java   |  2 +-
 .../netty4/NettyManualEndpointTest.java         |  6 +-
 18 files changed, 585 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
index 65d1162..edd387f 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/ChannelHandlerFactories.java
@@ -27,6 +27,13 @@ import io.netty.handler.codec.serialization.ObjectDecoder;
 import io.netty.handler.codec.serialization.ObjectEncoder;
 import io.netty.handler.codec.string.StringDecoder;
 import io.netty.handler.codec.string.StringEncoder;
+import org.apache.camel.component.netty4.codec.DatagramPacketDecoder;
+import org.apache.camel.component.netty4.codec.DatagramPacketDelimiterDecoder;
+import org.apache.camel.component.netty4.codec.DatagramPacketEncoder;
+import org.apache.camel.component.netty4.codec.DatagramPacketObjectDecoder;
+import org.apache.camel.component.netty4.codec.DatagramPacketObjectEncoder;
+import org.apache.camel.component.netty4.codec.DatagramPacketStringDecoder;
+import org.apache.camel.component.netty4.codec.DatagramPacketStringEncoder;
 
 
 
@@ -38,34 +45,73 @@ public final class ChannelHandlerFactories {
     private ChannelHandlerFactories() {
     }
 
-    public static ChannelHandlerFactory newStringEncoder(Charset charset) {
-        return new ShareableChannelHandlerFactory(new StringEncoder(charset));
+    public static ChannelHandlerFactory newStringEncoder(Charset charset, 
String protocol) {
+        if ("udp".equalsIgnoreCase(protocol)) {
+            return new ShareableChannelHandlerFactory(new 
DatagramPacketStringEncoder(charset));
+        } else {
+            return new ShareableChannelHandlerFactory(new 
StringEncoder(charset));
+        }
     }
 
-    public static ChannelHandlerFactory newStringDecoder(Charset charset) {
-        return new ShareableChannelHandlerFactory(new StringDecoder(charset));
+    public static ChannelHandlerFactory newStringDecoder(Charset charset, 
String protocol) {
+        if ("udp".equalsIgnoreCase(protocol)) {
+            return new ShareableChannelHandlerFactory(new 
DatagramPacketStringDecoder(charset)); 
+        } else {
+            return new ShareableChannelHandlerFactory(new 
StringDecoder(charset));
+        }
     }
-
-    public static ChannelHandlerFactory newObjectDecoder() {
-        return new DefaultChannelHandlerFactory() {
-            @Override
-            public ChannelHandler newChannelHandler() {
-                return new 
ObjectDecoder(ClassResolvers.weakCachingResolver(null));
-            }
-        };
+    
+    
+    public static ChannelHandlerFactory newObjectDecoder(String protocol) {
+        if ("udp".equalsIgnoreCase(protocol)) {
+            return new DefaultChannelHandlerFactory() {
+                @Override
+                public ChannelHandler newChannelHandler() {
+                    return new 
DatagramPacketObjectDecoder(ClassResolvers.weakCachingResolver(null));
+                }
+            };
+        } else {
+            return new DefaultChannelHandlerFactory() {
+                @Override
+                public ChannelHandler newChannelHandler() {
+                    return new 
ObjectDecoder(ClassResolvers.weakCachingResolver(null));
+                }
+            };
+        }
     }
-
-    public static ChannelHandlerFactory newObjectEncoder() {
-        return new ShareableChannelHandlerFactory(new ObjectEncoder());
+    
+    public static ChannelHandlerFactory newObjectEncoder(String protocol) {
+        if ("udp".equals(protocol)) {
+            return new ShareableChannelHandlerFactory(new 
DatagramPacketObjectEncoder());
+        } else {
+            return new ShareableChannelHandlerFactory(new ObjectEncoder());
+        }
     }
-
-    public static ChannelHandlerFactory newDelimiterBasedFrameDecoder(final 
int maxFrameLength, final ByteBuf[] delimiters) {
-        return new DefaultChannelHandlerFactory() {
-            @Override
-            public ChannelHandler newChannelHandler() {
-                return new DelimiterBasedFrameDecoder(maxFrameLength, true, 
delimiters);
-            }
-        };
+   
+    public static ChannelHandlerFactory newDelimiterBasedFrameDecoder(final 
int maxFrameLength, final ByteBuf[] delimiters, String protocol) {
+        if ("udp".equals(protocol)) {
+            return new DefaultChannelHandlerFactory() {
+                @Override
+                public ChannelHandler newChannelHandler() {
+                    return new DatagramPacketDelimiterDecoder(maxFrameLength, 
delimiters);
+                }
+            };
+        } else {
+            return new DefaultChannelHandlerFactory() {
+                @Override
+                public ChannelHandler newChannelHandler() {
+                    return new DelimiterBasedFrameDecoder(maxFrameLength, 
true, delimiters);
+                }
+            };
+        }
+    }
+    
+    public static ChannelHandlerFactory newDatagramPacketDecoder() {
+        return new ShareableChannelHandlerFactory(new DatagramPacketDecoder());
+    }
+    
+    public static ChannelHandlerFactory newDatagramPacketEncoder() {
+        return new ShareableChannelHandlerFactory(new DatagramPacketEncoder());
     }
 
     public static ChannelHandlerFactory newLengthFieldBasedFrameDecoder(final 
int maxFrameLength, final int lengthFieldOffset,

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
index 47b8af5..ffdc77d 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyConfiguration.java
@@ -189,13 +189,16 @@ public class NettyConfiguration extends 
NettyServerBootstrapConfiguration implem
         // add default encoders and decoders
         if (encoders.isEmpty() && decoders.isEmpty()) {
             if (allowDefaultCodec) {
+                if ("udp".equalsIgnoreCase(protocol)) {
+                    
encoders.add(ChannelHandlerFactories.newDatagramPacketEncoder());
+                }
                 // are we textline or object?
                 if (isTextline()) {
                     Charset charset = getEncoding() != null ? 
Charset.forName(getEncoding()) : CharsetUtil.UTF_8;
-                    
encoders.add(ChannelHandlerFactories.newStringEncoder(charset));
+                    
encoders.add(ChannelHandlerFactories.newStringEncoder(charset, protocol));
                     ByteBuf[] delimiters = delimiter == TextLineDelimiter.LINE 
? Delimiters.lineDelimiter() : Delimiters.nulDelimiter();
-                    
decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(decoderMaxLineLength,
 delimiters));
-                    
decoders.add(ChannelHandlerFactories.newStringDecoder(charset));
+                    
decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(decoderMaxLineLength,
 delimiters, protocol));
+                    
decoders.add(ChannelHandlerFactories.newStringDecoder(charset, protocol));
 
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Using textline encoders and decoders with 
charset: {}, delimiter: {} and decoderMaxLineLength: {}",
@@ -203,11 +206,14 @@ public class NettyConfiguration extends 
NettyServerBootstrapConfiguration implem
                     }
                 } else {
                     // object serializable is then used
-                    encoders.add(ChannelHandlerFactories.newObjectEncoder());
-                    decoders.add(ChannelHandlerFactories.newObjectDecoder());
+                    
encoders.add(ChannelHandlerFactories.newObjectEncoder(protocol));
+                    
decoders.add(ChannelHandlerFactories.newObjectDecoder(protocol));
 
                     LOG.debug("Using object encoders and decoders");
                 }
+                if ("udp".equalsIgnoreCase(protocol)) {
+                    
decoders.add(ChannelHandlerFactories.newDatagramPacketDecoder());
+                }
             } else {
                 LOG.debug("No encoders and decoders will be used");
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
index 76f1f52..b345cca 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyHelper.java
@@ -16,12 +16,14 @@
  */
 package org.apache.camel.component.netty4;
 
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.socket.DatagramPacket;
+import io.netty.channel.DefaultAddressedEnvelope;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.slf4j.Logger;
@@ -91,10 +93,10 @@ public final class NettyHelper {
             if (log.isDebugEnabled()) {
                 log.debug("Channel: {} remote address: {} writing body: {}", 
new Object[]{channel, remoteAddress, body});
             }
-            System.out.println("The remote address is " + remoteAddress);
-            // TODO Do we need to setup the remoteAddress this time
-            //future = channel.write(body, remoteAddress);
-            future = channel.writeAndFlush(body);
+            // Need to create AddressedEnvelope to setup the address 
information here
+            DefaultAddressedEnvelope<Object, InetSocketAddress> ae =
+                new DefaultAddressedEnvelope<Object, InetSocketAddress>(body, 
(InetSocketAddress)remoteAddress);
+            future = channel.writeAndFlush(ae);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("Channel: {} writing body: {}", new 
Object[]{channel, body});

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
index dca12b2..f7deb21 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyPayloadHelper.java
@@ -16,6 +16,10 @@
  */
 package org.apache.camel.component.netty4;
 
+import java.net.InetSocketAddress;
+
+import io.netty.channel.AddressedEnvelope;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultExchangeHolder;
 
@@ -57,6 +61,13 @@ public final class NettyPayloadHelper {
     public static void setIn(Exchange exchange, Object payload) {
         if (payload instanceof DefaultExchangeHolder) {
             DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) 
payload);
+        } else if (payload instanceof AddressedEnvelope) {
+            @SuppressWarnings("unchecked")
+            AddressedEnvelope<Object, InetSocketAddress> dp = 
(AddressedEnvelope<Object, InetSocketAddress>)payload; 
+            // need to take out the payload here
+            exchange.getIn().setBody(dp.content());
+            // setup the sender address here for sending the response message 
back
+            exchange.setProperty(NettyConstants.NETTY_REMOTE_ADDRESS, 
dp.sender());
         } else {
             // normal transfer using the body only
             exchange.getIn().setBody(payload);
@@ -66,6 +77,13 @@ public final class NettyPayloadHelper {
     public static void setOut(Exchange exchange, Object payload) {
         if (payload instanceof DefaultExchangeHolder) {
             DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) 
payload);
+        } else if (payload instanceof AddressedEnvelope) {
+            @SuppressWarnings("unchecked")
+            AddressedEnvelope<Object, InetSocketAddress> dp = 
(AddressedEnvelope<Object, InetSocketAddress>)payload; 
+            // need to take out the payload here 
+            exchange.getOut().setBody(dp.content());
+            // setup the sender address here for sending the response message 
back
+            exchange.setProperty(NettyConstants.NETTY_REMOTE_ADDRESS, 
dp.sender());
         } else {
             // normal transfer using the body only and preserve the headers
             exchange.getOut().setHeaders(exchange.getIn().getHeaders());

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index 1082acd..c0e5360 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
@@ -228,9 +228,14 @@ public class NettyProducer extends DefaultAsyncProducer {
 
         // setup state as attachment on the channel, so we can access the 
state later when needed
         putState(channel, new NettyCamelState(producerCallback, exchange));
+        // here we need to setup the remote address information here
+        InetSocketAddress remoteAddress = null;
+        if (!isTcp()) {
+            remoteAddress = new InetSocketAddress(configuration.getHost(), 
configuration.getPort()); 
+        }
 
         // write body
-        NettyHelper.writeBodyAsync(LOG, channel, null, body, exchange, new 
ChannelFutureListener() {
+        NettyHelper.writeBodyAsync(LOG, channel, remoteAddress, body, 
exchange, new ChannelFutureListener() {
             public void operationComplete(ChannelFuture channelFuture) throws 
Exception {
                 LOG.trace("Operation complete {}", channelFuture);
                 if (!channelFuture.isSuccess()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDecoder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDecoder.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDecoder.java
new file mode 100644
index 0000000..bd6d41f
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDecoder.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.codec;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultAddressedEnvelope;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+@Sharable
+public class DatagramPacketDecoder extends 
MessageToMessageDecoder<DatagramPacket> {
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, 
List<Object> out) throws Exception {
+        // decode the DatagramPackage to AddressedEnvelope
+        DefaultAddressedEnvelope<Object, InetSocketAddress> addressEvelop = 
+            new DefaultAddressedEnvelope<Object, 
InetSocketAddress>(msg.content().retain(), msg.recipient(), msg.sender());
+        out.add(addressEvelop);
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDelimiterDecoder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDelimiterDecoder.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDelimiterDecoder.java
new file mode 100644
index 0000000..5d67756
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketDelimiterDecoder.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.codec;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.AddressedEnvelope;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultAddressedEnvelope;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+public class DatagramPacketDelimiterDecoder extends 
MessageToMessageDecoder<AddressedEnvelope<Object, InetSocketAddress>> {
+    private final DelimiterBasedFrameDecoder delegateDecoder;
+    
+    public DatagramPacketDelimiterDecoder(int maxFrameLength, ByteBuf[] 
delimiters) {
+        delegateDecoder = new DelimiterBasedFrameDecoder(maxFrameLength, true, 
delimiters);
+    }
+    @Override
+    protected void decode(ChannelHandlerContext ctx, AddressedEnvelope<Object, 
InetSocketAddress> msg,
+                          List<Object> out) throws Exception {
+        if (msg.content() instanceof ByteBuf) {
+            ByteBuf payload = (ByteBuf)msg.content();
+            Object result = delegateDecoder.decode(ctx, payload);
+            AddressedEnvelope<Object, InetSocketAddress> addressEvelop = 
+                new DefaultAddressedEnvelope<Object, 
InetSocketAddress>(result, msg.recipient(), msg.sender());
+            out.add(addressEvelop);
+        }
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketEncoder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketEncoder.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketEncoder.java
new file mode 100644
index 0000000..c194763
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketEncoder.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.codec;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.AddressedEnvelope;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.socket.DatagramPacket;
+import io.netty.handler.codec.MessageToMessageEncoder;
+
+@Sharable
+public class DatagramPacketEncoder extends 
MessageToMessageEncoder<AddressedEnvelope<Object, InetSocketAddress>> {
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<Object, 
InetSocketAddress> msg, List<Object> out) throws Exception {
+        if (msg.content() instanceof ByteBuf) {
+            ByteBuf payload = (ByteBuf)msg.content();
+            // Just wrap the message as DatagramPacket, need to make sure the 
message content is ByteBuf
+            DatagramPacket dp = new DatagramPacket(payload.retain(), 
msg.recipient());
+            out.add(dp);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectDecoder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectDecoder.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectDecoder.java
new file mode 100644
index 0000000..cd314ca
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectDecoder.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.codec;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.AddressedEnvelope;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultAddressedEnvelope;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.serialization.ClassResolver;
+
+
+
+public class DatagramPacketObjectDecoder extends 
MessageToMessageDecoder<AddressedEnvelope<Object, InetSocketAddress>> {
+
+    private final ObjectDecoder delegateDecoder;
+    
+    public DatagramPacketObjectDecoder(ClassResolver resolver) {
+        delegateDecoder = new ObjectDecoder(resolver);
+    }
+    
+    @Override
+    protected void decode(ChannelHandlerContext ctx, AddressedEnvelope<Object, 
InetSocketAddress> msg,
+                          List<Object> out) throws Exception {
+        if (msg.content() instanceof ByteBuf) {
+            ByteBuf payload = (ByteBuf) msg.content();
+            Object result = delegateDecoder.decode(ctx, payload);
+            AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = 
+                new DefaultAddressedEnvelope<Object, 
InetSocketAddress>(result, msg.recipient(), msg.sender());
+            out.add(addressedEnvelop);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectEncoder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectEncoder.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectEncoder.java
new file mode 100644
index 0000000..d57850e
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketObjectEncoder.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.codec;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.AddressedEnvelope;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultAddressedEnvelope;
+import io.netty.handler.codec.MessageToMessageEncoder;
+
+@Sharable
+public class DatagramPacketObjectEncoder extends
+        MessageToMessageEncoder<AddressedEnvelope<Object, InetSocketAddress>> {
+    private ObjectEncoder delegateObjectEncoder;
+    public DatagramPacketObjectEncoder() {
+        delegateObjectEncoder = new ObjectEncoder();
+    }
+    @Override
+    protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<Object, 
InetSocketAddress> msg,
+                          List<Object> out) throws Exception {
+        if (msg.content() instanceof Serializable) {
+            Serializable payload = (Serializable) msg.content();
+            ByteBuf buf = ctx.alloc().heapBuffer();
+            delegateObjectEncoder.encode(ctx, payload, buf);
+            AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = 
+                new DefaultAddressedEnvelope<Object, 
InetSocketAddress>(buf.retain(), msg.recipient(), msg.sender());
+            out.add(addressedEnvelop);
+        }
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringDecoder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringDecoder.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringDecoder.java
new file mode 100644
index 0000000..cd63d4a
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringDecoder.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.codec;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.AddressedEnvelope;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultAddressedEnvelope;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+@Sharable
+public class DatagramPacketStringDecoder extends 
MessageToMessageDecoder<AddressedEnvelope<Object, InetSocketAddress>> {
+  
+    private final Charset charset;
+
+    /**
+     * Creates a new instance with the current system character set.
+     */
+    public DatagramPacketStringDecoder() {
+        this(Charset.defaultCharset());
+    }
+
+    /**
+     * Creates a new instance with the specified character set.
+     */
+    public DatagramPacketStringDecoder(Charset charset) {
+        if (charset == null) {
+            throw new NullPointerException("charset");
+        }
+        this.charset = charset;
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, AddressedEnvelope<Object, 
InetSocketAddress> msg, List<Object> out) throws Exception {
+        if (msg.content() instanceof ByteBuf) {
+            ByteBuf payload = (ByteBuf)msg.content();
+            AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = 
+                new DefaultAddressedEnvelope<Object, 
InetSocketAddress>(payload.toString(charset), msg.recipient(), msg.sender());
+            out.add(addressedEnvelop);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringEncoder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringEncoder.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringEncoder.java
new file mode 100644
index 0000000..539056e
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DatagramPacketStringEncoder.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.netty4.codec;
+
+import java.net.InetSocketAddress;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.AddressedEnvelope;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultAddressedEnvelope;
+import io.netty.handler.codec.MessageToMessageEncoder;
+
+@Sharable
+public class DatagramPacketStringEncoder extends
+    MessageToMessageEncoder<AddressedEnvelope<Object, InetSocketAddress>> {
+
+    private final Charset charset;
+
+    /**
+     * Creates a new instance with the current system character set.
+     */
+    public DatagramPacketStringEncoder() {
+        this(Charset.defaultCharset());
+    }
+
+    /**
+     * Creates a new instance with the specified character set.
+     */
+    public DatagramPacketStringEncoder(Charset charset) {
+        if (charset == null) {
+            throw new NullPointerException("charset");
+        }
+        this.charset = charset;
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, AddressedEnvelope<Object, 
InetSocketAddress> msg,
+                          List<Object> out) throws Exception {
+        if (msg.content() instanceof CharSequence) {
+            CharSequence payload = (CharSequence)msg.content();
+            if (payload.length() == 0) {
+                return;
+            }
+            AddressedEnvelope<Object, InetSocketAddress> addressedEnvelop = 
+                new DefaultAddressedEnvelope<Object, 
InetSocketAddress>(ByteBufUtil.encodeString(ctx.alloc(), 
CharBuffer.wrap(payload), charset), msg.recipient(), msg.sender());
+            out.add(addressedEnvelop);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DelimiterBasedFrameDecoder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DelimiterBasedFrameDecoder.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DelimiterBasedFrameDecoder.java
new file mode 100644
index 0000000..d6ededf
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/DelimiterBasedFrameDecoder.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+
+public class DelimiterBasedFrameDecoder extends 
io.netty.handler.codec.DelimiterBasedFrameDecoder {
+
+    public DelimiterBasedFrameDecoder(int maxFrameLength, boolean 
stripDelimiter, ByteBuf[] delimiters) {
+        super(maxFrameLength, stripDelimiter, delimiters);
+    }
+    
+    @Override
+    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws 
Exception {
+        return super.decode(ctx, in);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectDecoder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectDecoder.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectDecoder.java
new file mode 100644
index 0000000..c414a4e
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectDecoder.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.codec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.serialization.ClassResolver;
+
+
+/**
+ * Just expose the decode method for DatagramPacketObjectDecoder to use
+ */
+public class ObjectDecoder extends 
io.netty.handler.codec.serialization.ObjectDecoder {
+
+    public ObjectDecoder(ClassResolver classResolver) {
+        super(classResolver);
+    }
+    
+    
+    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws 
Exception {
+        return super.decode(ctx, in);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectEncoder.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectEncoder.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectEncoder.java
new file mode 100644
index 0000000..c30042a
--- /dev/null
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/codec/ObjectEncoder.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty4.codec;
+
+import java.io.Serializable;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+
+
+/**
+ * Just expose the encode method for DatagramPacketObjectEncoder to use
+ */
+public class ObjectEncoder extends 
io.netty.handler.codec.serialization.ObjectEncoder {
+    @Override
+    public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf 
out) throws Exception {
+        super.encode(ctx, msg, out);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
index 47dce7b..a03a99e 100644
--- 
a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
+++ 
b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/handlers/ServerChannelHandler.java
@@ -21,10 +21,10 @@ import java.net.SocketAddress;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.component.netty4.NettyConstants;
 import org.apache.camel.component.netty4.NettyConsumer;
 import org.apache.camel.component.netty4.NettyHelper;
 import org.apache.camel.component.netty4.NettyPayloadHelper;
@@ -172,7 +172,7 @@ public class ServerChannelHandler extends 
SimpleChannelInboundHandler<Object> {
             if (consumer.getConfiguration().isTcp()) {
                 NettyHelper.writeBodyAsync(LOG, ctx.channel(), null, body, 
exchange, listener);
             } else {
-                NettyHelper.writeBodyAsync(LOG, ctx.channel(), 
ctx.channel().remoteAddress(), body, exchange, listener);
+                NettyHelper.writeBodyAsync(LOG, ctx.channel(), 
exchange.getProperty(NettyConstants.NETTY_REMOTE_ADDRESS, SocketAddress.class), 
body, exchange, listener);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
 
b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
index 4fe97de..be8e431 100644
--- 
a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
+++ 
b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/MyCustomCodec.java
@@ -35,7 +35,7 @@ public final class MyCustomCodec {
 
     public static ChannelHandlerFactory createMyCustomDecoder() {
         ByteBuf[] delimiters = new ByteBuf[]{nullDelimiter, nullDelimiter};
-        return ChannelHandlerFactories.newDelimiterBasedFrameDecoder(4096, 
delimiters);
+        return ChannelHandlerFactories.newDelimiterBasedFrameDecoder(4096, 
delimiters, "tcp");
     }
 
     public static ChannelHandler createMyCustomDecoder2() {

http://git-wip-us.apache.org/repos/asf/camel/blob/706d1b44/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyManualEndpointTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyManualEndpointTest.java
 
b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyManualEndpointTest.java
index 4707a61..6ed15eb 100644
--- 
a/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyManualEndpointTest.java
+++ 
b/components/camel-netty4/src/test/java/org/apache/camel/component/netty4/NettyManualEndpointTest.java
@@ -55,10 +55,10 @@ public class NettyManualEndpointTest extends BaseNettyTest {
                 nettyConfig.setSync(false);
 
                 // need to add encoders and decoders manually
-                
nettyConfig.setEncoder(ChannelHandlerFactories.newStringEncoder(CharsetUtil.UTF_8));
+                
nettyConfig.setEncoder(ChannelHandlerFactories.newStringEncoder(CharsetUtil.UTF_8,
 "tcp"));
                 List<ChannelHandler> decoders = new 
ArrayList<ChannelHandler>();
-                
decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(1000, 
Delimiters.lineDelimiter()));
-                
decoders.add(ChannelHandlerFactories.newStringDecoder(CharsetUtil.UTF_8));
+                
decoders.add(ChannelHandlerFactories.newDelimiterBasedFrameDecoder(1000, 
Delimiters.lineDelimiter(), "tcp"));
+                
decoders.add(ChannelHandlerFactories.newStringDecoder(CharsetUtil.UTF_8, 
"tcp"));
                 nettyConfig.setDecoders(decoders);
 
                 // create and start component

Reply via email to