Hi Krishna,
What is your concept of a message? I mean, is it a fixed string or length
delimited etc? You might want to look at  PrefixedStringDecoder to break
messages as they are received.




On Wed, 19 Dec 2018 at 9:08 PM, Krishan Babbar <[email protected]>
wrote:

> Dear All,
>
> I am working on an IoT project. I have created a Java application using
> MINA library. All the devices are connecting to this Java application using
> TCP protocol.
> I have created simulator (to simulate devices) to do our load testing. In
> simulator I have given limit of 3000 threads in thread pool so that one
> instance of simulator can connect 3000 devices at once.
> I tried testing for 30000 devices by executing 5 instances of simulators
> in parallel. Each device was sending messages after interval of 30 seconds
> and I had 10 messages per device.
>
> But my Java Application did not receive all the messages in one go, it is
> processing messages very slow.
>
> Suppose 5 simulators have sent (3000 devices/threads per instance * 5
> instances * 5 mins) 150000 messages in 5 minutes but Java Application (with
> MINA) has processed about 10000 messages only.
> And even after sometime my Java Application was closed due to memory issue.
>
> Please help and guide how MINA is working in case of lot of
> devices/sockets connection?
> Where is MINA storing messages which are yet to process?
>
> With 16 GB RAM Linux machine, and giving 4 GB and 6 GB as a heap
> parameters in Java Application (java -Xms4096m -Xmx6144m -Dserver -jar
> Application.jar),  how many devices/sockets MINA library can handle?
>
> Is MINA using single thread to connect all devices and receive packets
> from them?
> I mean is it possible to receive all messages immediately in my Request
> Handler?
>
> Below is given my code for MINA configuration and from request handler we
> are starting a new thread for each message for processing.
>
> public abstract class AbstractListener {
>
>                 /** The logger instance.*/
>                 private static Logger LOGGER =
> Logger.getLogger(AbstractListener.class);
>
>                 /** The buffer size constant. */
>                 protected static final int BUFFER_SIZE = 2048;
>
>                 /**
>                 * The method to initialize udp and tcp/ip listeners.
>                 *
>                  * @param portKey
>                 *            the port of the udp or tcp/ip listener.
>                 * @throws IOException
>                 *             the io exception to be thrown in case of
> error.
>                 */
>                 public void init(String portKey) throws IOException {
>                                 LOGGER.info("Entering init()");
>                                 AbstractIoAcceptor acceptor =
> initInternal();
>                                 acceptor.setHandler(new RequestHandler());
>
> acceptor.getFilterChain().addLast("logger", new LoggingFilter());
>
> acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE);
>
> acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
>                                 if (acceptor instanceof
> NioDatagramAcceptor) {
>                                                 ((NioDatagramAcceptor)
> acceptor).getSessionConfig().setReuseAddress(true);
>                                 }
>                                 acceptor.bind(new
> InetSocketAddress(Integer.parseInt(ServerConfigLoader.getInstance().getProperty(portKey))));
>                                 LOGGER.info("Exiting init()");
>                 }
>
>                 /**
>                 * The abstract method to be over ridden by appropriate
> listener (udp /
>                 * tcp/ip) for invoking appropriate listener.
>                 *
>                  * @return the appropriate (udp/ tcp/ip) io acceptor.
>                 */
>                 public abstract AbstractIoAcceptor initInternal();
> }
>
>
> public class RequestHandler extends IoHandlerAdapter {
>
>                 /** The logger instance. */
>                 private static final Logger LOGGER =
> LoggerFactory.getLogger(RequestHandler.class);
>
>                 private ExecutorService executorService =
> Executors.newCachedThreadPool();
>
>                 /**
>                 * Default constructor.
>                 */
>                 public RequestHandler() {
>                                 super();
>                 }
>
>                 /**
>                 * {@inheritDoc}
>                 */
>                 @Override
>                 public void exceptionCaught(IoSession session, Throwable
> cause) throws Exception {
>                                 LOGGER.error("Exception caught in
> RequestHandler.exceptionCaught()", cause);
>                                 session.closeNow();
>                 }
>
>                 /**
>                 * {@inheritDoc}
>                 */
>                 @Override
>                 public void messageReceived(IoSession session, Object
> message) throws Exception {
>                                 LOGGER.info("Entering messageReceived()");
>                                 try {
>                                                 SocketAddress
> remoteAddress = session.getRemoteAddress();
>                                                 String msg =
> ((IoBuffer)message).getHexDump().replaceAll(" ", "");
>                                                 String strArray[] =
> remoteAddress.toString().split(":");
>                                                 LOGGER.info("Client
> IP:Port & Message [" + InetAddress.getByName(strArray[0].split("/")[1]) +
> ":" + strArray[1] + " -> " + msg + "]");
>
> executorService.execute(new HelperThread(session, msg));
>                                 } catch (Exception e) {
>                                                 LOGGER.error("Exception
> occurred in RequestHandler", e);
>                                                 throw e;
>                                 } finally {
>                                                 LOGGER.info("Exiting
> messageReceived()");
>                                 }
>                 }
> }
>
>
> Thanks & Regards,
> Krishan Babbar
>
> ============================================================================================================================
> Disclaimer: This message and the information contained herein is
> proprietary and confidential and subject to the Tech Mahindra policy
> statement, you may review the policy at
> http://www.techmahindra.com/Disclaimer.html externally
> http://tim.techmahindra.com/tim/disclaimer.html internally within
> TechMahindra.
>
> ===========================================================================================================================
>
>
> ============================================================================================================================
>
> Disclaimer:  This message and the information contained herein is
> proprietary and confidential and subject to the Tech Mahindra policy
> statement, you may review the policy at
> http://www.techmahindra.com/Disclaimer.html <
> http://www.techmahindra.com/Disclaimer.html> externally
> http://tim.techmahindra.com/tim/disclaimer.html <
> http://tim.techmahindra.com/tim/disclaimer.html> internally within
> TechMahindra.
>
>
> ============================================================================================================================
>
-- 
Raghavendra Balgi
Bangalore
Phone: +91 7795816719
Email : [email protected] <http://[email protected]/[email protected]>

Reply via email to