How to set receiveBufferSizePredictor option in Camel-netty udp consumer server? ---------------------------------------------------------------------------------
Key: CAMEL-3607 URL: https://issues.apache.org/jira/browse/CAMEL-3607 Project: Camel Issue Type: Improvement Components: camel-netty Affects Versions: 2.6.0 Reporter: JungHo Cha Hello, I am trying to build the camel netty udp consumer. My spring XML like the below : <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <jmxAgent id="agent" disabled="true" /> <route> <from uri="netty:udp://localhost:164?decoders=#decoders&sync=false" /> <to uri="log:com.brm?level=DEBUG" /> </route> </camelContext> <util:list id="decoders" list-class="java.util.LinkedList"> <bean class="com.brm.test.netty.NettyDecoder" /> </util:list> ... My test udp packet size is over 1000 bytes. When my udp client sent the udp packet, Netty consumer did not receive the full packet data, Instead netty decode() callback received only 768 bytes. And no another call happened. Then I googled this problem, So I found the receiveBufferSizePredictor field in DatagramChannelConfig Class was related received udp packet size. If receiveBufferSizePredictor is set bigger than the real udp packet, It might be that this problem can be cleared. But.. I cannot find this receiveBufferSizePredictor option in The Camel Netty options. And I have no idea this option where to insert after reviewing camel netty source too. 11:04:09,660 [DEBUG] [New I/O datagram worker #1'-'1] [NettyDecoder] (NettyDecoder.java:17) - decode(ChannelHandlerContext, Channel, ChannelBuffer) - start 11:04:09,660 [DEBUG] [New I/O datagram worker #1'-'1] [NettyDecoder] (NettyDecoder.java:20) - receivedBytes = 768 11:04:09,676 [DEBUG] [New I/O datagram worker #1'-'1] [ServerChannelHandler] (ServerChannelHandler.java:83) - Incoming message: 012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567 The above number 768 may be related the option "receiveBufferSizePredictor" I guess if the receiveBufferSizePredictor size is set over the expected received packet size, Netty decode method is fired when all data is received or receiveBufferSizePredictor size data is received. I think that netty developer thinks of the receiving memory usage optimazation. i.e. the receiveBufferSizePredictor is the key for tunning network and memory balance. Just from my guess... This receiveBufferSizePredictor option is really critical when camel netty udp consumer wants to receive some big size udp packet. I think this receiveBufferSizePredictor is included in the camel netty option. or it is happy if netty global or session options are accesible via camel netty method or injected camel netty configuration. Thank a lot. package com.brm.test.netty; import org.apache.log4j.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; public class NettyDecoder extends FrameDecoder { private static final Logger logger = Logger.getLogger(NettyDecoder.class); @Override protected Object decode(ChannelHandlerContext channelhandlercontext, Channel channel, ChannelBuffer channelbuffer) throws Exception { logger.debug("decode(ChannelHandlerContext, Channel, ChannelBuffer) - start"); int receivedBytes = channelbuffer.readableBytes(); logger.debug("receivedBytes = " + receivedBytes); byte[] dst = new byte[receivedBytes]; channelbuffer.readBytes(dst); return new String(dst); } } ===================== package com.brm.test.netty; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.UnknownHostException; public class UDPClient { public static void main(String args[]) { DatagramSocket dsock = null; InetAddress inetAddr = null; String localhost = "127.0.0.1"; int port = 164; int packetSize = 1000; String value = ""; try { inetAddr = InetAddress.getByName(localhost); dsock = new DatagramSocket(); for (int i = 0; i < packetSize; i++) { value += i % 10; } byte[] messageBytes = value.getBytes(); DatagramPacket sendLog = new DatagramPacket(messageBytes, messageBytes.length, inetAddr, port); dsock.send(sendLog); dsock.close(); } catch (UnknownHostException uhe) { System.out.println(uhe); } catch (IOException ie) { System.out.println(ie); } catch (Exception e) { System.out.println(e); } finally { if (dsock != null) { dsock.close(); } } } } ================================= package com.brm.test.netty; import org.apache.camel.spring.Main; public class UDPServer { private Main main; public static void main(String[] args) throws Exception { UDPServer udpServer = new UDPServer(); udpServer.boot(); } public void boot() throws Exception { main = new Main(); main.enableHangupSupport(); main.setApplicationContextUri("classpath:com/brm/test/netty/udp-test.xml"); System.out.println("Starting UDPServer... Use ctrl + c to terminate the JVM.\n"); main.run(); } } ==================================== <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xmlns:camel="http://camel.apache.org/schema/spring" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring-2.5.0.xsd"> <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <jmxAgent id="agent" disabled="true" /> <route> <from uri="netty:udp://localhost:164?decoders=#decoders&sync=false" /> <to uri="log:com.brm?level=DEBUG" /> </route> </camelContext> <util:list id="decoders" list-class="java.util.LinkedList"> <bean class="com.brm.test.netty.NettyDecoder" /> </util:list> </beans> ================================= <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>CAMEL-NETTY-TEST</groupId> <artifactId>CAMEL-NETTY-TEST</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-netty</artifactId> <version>2.5.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>2.5.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.12</version> <type>jar</type> <scope>compile</scope> </dependency> </dependencies> </project> -- This message is automatically generated by JIRA. - For more information on JIRA, see: http://www.atlassian.com/software/jira