Hi Krishna, What is your concept of a message? I mean, is it a fixed string or length delimited etc? You might want to look at PrefixedStringDecoder to break messages as they are received.
On Wed, 19 Dec 2018 at 9:08 PM, Krishan Babbar <[email protected]> wrote: > Dear All, > > I am working on an IoT project. I have created a Java application using > MINA library. All the devices are connecting to this Java application using > TCP protocol. > I have created simulator (to simulate devices) to do our load testing. In > simulator I have given limit of 3000 threads in thread pool so that one > instance of simulator can connect 3000 devices at once. > I tried testing for 30000 devices by executing 5 instances of simulators > in parallel. Each device was sending messages after interval of 30 seconds > and I had 10 messages per device. > > But my Java Application did not receive all the messages in one go, it is > processing messages very slow. > > Suppose 5 simulators have sent (3000 devices/threads per instance * 5 > instances * 5 mins) 150000 messages in 5 minutes but Java Application (with > MINA) has processed about 10000 messages only. > And even after sometime my Java Application was closed due to memory issue. > > Please help and guide how MINA is working in case of lot of > devices/sockets connection? > Where is MINA storing messages which are yet to process? > > With 16 GB RAM Linux machine, and giving 4 GB and 6 GB as a heap > parameters in Java Application (java -Xms4096m -Xmx6144m -Dserver -jar > Application.jar), how many devices/sockets MINA library can handle? > > Is MINA using single thread to connect all devices and receive packets > from them? > I mean is it possible to receive all messages immediately in my Request > Handler? > > Below is given my code for MINA configuration and from request handler we > are starting a new thread for each message for processing. > > public abstract class AbstractListener { > > /** The logger instance.*/ > private static Logger LOGGER = > Logger.getLogger(AbstractListener.class); > > /** The buffer size constant. */ > protected static final int BUFFER_SIZE = 2048; > > /** > * The method to initialize udp and tcp/ip listeners. > * > * @param portKey > * the port of the udp or tcp/ip listener. > * @throws IOException > * the io exception to be thrown in case of > error. > */ > public void init(String portKey) throws IOException { > LOGGER.info("Entering init()"); > AbstractIoAcceptor acceptor = > initInternal(); > acceptor.setHandler(new RequestHandler()); > > acceptor.getFilterChain().addLast("logger", new LoggingFilter()); > > acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE); > > acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); > if (acceptor instanceof > NioDatagramAcceptor) { > ((NioDatagramAcceptor) > acceptor).getSessionConfig().setReuseAddress(true); > } > acceptor.bind(new > InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey)))); > LOGGER.info("Exiting init()"); > } > > /** > * The abstract method to be over ridden by appropriate > listener (udp / > * tcp/ip) for invoking appropriate listener. > * > * @return the appropriate (udp/ tcp/ip) io acceptor. > */ > public abstract AbstractIoAcceptor initInternal(); > } > > > public class RequestHandler extends IoHandlerAdapter { > > /** The logger instance. */ > private static final Logger LOGGER = > LoggerFactory.getLogger(RequestHandler.class); > > private ExecutorService executorService = > Executors.newCachedThreadPool(); > > /** > * Default constructor. > */ > public RequestHandler() { > super(); > } > > /** > * {@inheritDoc} > */ > @Override > public void exceptionCaught(IoSession session, Throwable > cause) throws Exception { > LOGGER.error("Exception caught in > RequestHandler.exceptionCaught()", cause); > session.closeNow(); > } > > /** > * {@inheritDoc} > */ > @Override > public void messageReceived(IoSession session, Object > message) throws Exception { > LOGGER.info("Entering messageReceived()"); > try { > SocketAddress > remoteAddress = session.getRemoteAddress(); > String msg = > ((IoBuffer)message).getHexDump().replaceAll(" ", ""); > String strArray[] = > remoteAddress.toString().split(":"); > LOGGER.info("Client > IP:Port & Message [" + InetAddress.getByName(strArray[0].split("/")[1]) + > ":" + strArray[1] + " -> " + msg + "]"); > > executorService.execute(new HelperThread(session, msg)); > } catch (Exception e) { > LOGGER.error("Exception > occurred in RequestHandler", e); > throw e; > } finally { > LOGGER.info("Exiting > messageReceived()"); > } > } > } > > > Thanks & Regards, > Krishan Babbar > > ============================================================================================================================ > Disclaimer: This message and the information contained herein is > proprietary and confidential and subject to the Tech Mahindra policy > statement, you may review the policy at > http://www.techmahindra.com/Disclaimer.html externally > http://tim.techmahindra.com/tim/disclaimer.html internally within > TechMahindra. > > =========================================================================================================================== > > > ============================================================================================================================ > > Disclaimer: This message and the information contained herein is > proprietary and confidential and subject to the Tech Mahindra policy > statement, you may review the policy at > http://www.techmahindra.com/Disclaimer.html < > http://www.techmahindra.com/Disclaimer.html> externally > http://tim.techmahindra.com/tim/disclaimer.html < > http://tim.techmahindra.com/tim/disclaimer.html> internally within > TechMahindra. > > > ============================================================================================================================ > -- Raghavendra Balgi Bangalore Phone: +91 7795816719 Email : [email protected] <http://[email protected]/[email protected]>
