Writing more than one message will block until the MessageReceived as been 
fully proceced
-----------------------------------------------------------------------------------------

                 Key: DIRMINA-682
                 URL: https://issues.apache.org/jira/browse/DIRMINA-682
             Project: MINA
          Issue Type: Bug
    Affects Versions: 2.0.0-M4
            Reporter: Emmanuel Lecharny
            Priority: Blocker
             Fix For: 2.0.0-RC1


When a message generates mor ethan one response, then the written responses 
will be sent to the client only when the initial message has been totally 
processed.

Suppose that we receive one message M, it will be handled by a IoProcessor in 
the process() method, go through the chain to the IoHandler.MessageReceive() 
method. Now, if one want to write more than one response (session.write( R )), 
then those responses will be enqueued until we are back to the process() method.

The issue is due to the fact that the write is done using the IoProcessor 
associated to the current session, leading to a problem : we can't ask the 
IoProcessor instance to unqueue the written message until it is done with the 
current processing( it's running in one single thread).

The consequences are painfull :
- if one tries to write two responses, waiting for the first responses to be 
written, this will end with a DeadLock, as we are waiting on the processor we 
are holding
- if we don't care about waiting for the write to be done, then all the 
responses will be enqueued and stored in memory, until the IoProcessor exit 
from the read processing and start processing the writes, leading to OOM 
Exception

One solution would be to have to sets of IoProcessors, one for the read, and 
one for the writes. Or to pick a random Processor to process the writes, as 
soon as the processor is not the same as the one processing the reads.

Here is a sample exhibiting the problem. Just launch it, and use 'telnet 
localhost 8080' in a console, type something, it should write twice the typed 
message, but it just generates an exception - see further - and write back the 
message once. Removing the wait will work, but the messages will be sent only 
when the read has been processed in the AbstractPollingIoProcessor.process(T 
session) method :
    /**
     * Deal with session ready for the read or write operations, or both.
     */
    private void process(T session) {
        // Process Reads
        if (isReadable(session) && !session.isReadSuspended()) {
            read(session);
        }

        // Process writes
        if (isWritable(session) && !session.isWriteSuspended()) {
            scheduleFlush(session);
        }
    }

The sample code :

package org.apache.mina.real.life;

import java.net.InetSocketAddress;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

/**
 * (<b>Entry point</b>) Echo server
 *
 * @author The Apache MINA Project (dev@mina.apache.org)
 */
public class Main {
    private static class EchoProtocolHandler extends IoHandlerAdapter {
        public void messageReceived(IoSession session, Object message)
                throws Exception {
            System.out.println(new String(((IoBuffer)message).array()));
            // Write the received data back to remote peer
            WriteFuture wf = session.write(((IoBuffer) message).duplicate());
            
            // Here, we will get a Deadlock detection
            wf.awaitUninterruptibly();
            
            // Do a second write
            session.write(((IoBuffer) message).duplicate());
        }
    }    
    /** Choose your favorite port number. */
    private static final int PORT = 8080;

    public static void main(String[] args) throws Exception {
        SocketAcceptor acceptor = new NioSocketAcceptor();
        
        // Add a logging filter
        acceptor.getFilterChain().addLast( "Logger", new LoggingFilter() );
        
        // Bind
        acceptor.setHandler(new EchoProtocolHandler());
        acceptor.bind(new InetSocketAddress(PORT));

        System.out.println("Listening on port " + PORT);
    }
}

The exception :

Listening on port 8080
[20:08:17] INFO [org.apache.mina.filter.logging.LoggingFilter] - CREATED
[20:08:17] INFO [org.apache.mina.filter.logging.LoggingFilter] - OPENED
[20:08:19] INFO [org.apache.mina.filter.logging.LoggingFilter] - RECEIVED: 
HeapBuffer[pos=0 lim=6 cap=2048: 74 65 73 74 0D 0A]
test

[20:08:30] WARN [org.apache.mina.filter.logging.LoggingFilter] - EXCEPTION :
java.lang.IllegalStateException: DEAD LOCK: IoFuture.await() was invoked from 
an I/O processor thread.  Please use IoFutureListener or configure a proper 
thread model alternatively.
        at 
org.apache.mina.core.future.DefaultIoFuture.checkDeadLock(DefaultIoFuture.java:235)
        at 
org.apache.mina.core.future.DefaultIoFuture.await0(DefaultIoFuture.java:203)
        at 
org.apache.mina.core.future.DefaultIoFuture.awaitUninterruptibly(DefaultIoFuture.java:131)
        at 
org.apache.mina.core.future.DefaultWriteFuture.awaitUninterruptibly(DefaultWriteFuture.java:114)
        at 
org.apache.mina.real.life.Main$EchoProtocolHandler.messageReceived(Main.java:45)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain$TailFilter.messageReceived(DefaultIoFilterChain.java:722)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:48)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:802)
        at 
org.apache.mina.filter.logging.LoggingFilter.messageReceived(LoggingFilter.java:178)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:48)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:802)
        at 
org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:120)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:426)
        at 
org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:604)
        at 
org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:564)
        at 
org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:553)
        at 
org.apache.mina.core.polling.AbstractPollingIoProcessor.access$400(AbstractPollingIoProcessor.java:57)
        at 
org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:892)
        at 
org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:65)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
[20:08:30] WARN [org.apache.mina.real.life.Main$EchoProtocolHandler] - 
EXCEPTION, please implement 
org.apache.mina.real.life.Main$EchoProtocolHandler.exceptionCaught() for proper 
handling:
java.lang.IllegalStateException: DEAD LOCK: IoFuture.await() was invoked from 
an I/O processor thread.  Please use IoFutureListener or configure a proper 
thread model alternatively.
        at 
org.apache.mina.core.future.DefaultIoFuture.checkDeadLock(DefaultIoFuture.java:235)
        at 
org.apache.mina.core.future.DefaultIoFuture.await0(DefaultIoFuture.java:203)
        at 
org.apache.mina.core.future.DefaultIoFuture.awaitUninterruptibly(DefaultIoFuture.java:131)
        at 
org.apache.mina.core.future.DefaultWriteFuture.awaitUninterruptibly(DefaultWriteFuture.java:114)
        at 
org.apache.mina.real.life.Main$EchoProtocolHandler.messageReceived(Main.java:45)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain$TailFilter.messageReceived(DefaultIoFilterChain.java:722)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:48)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:802)
        at 
org.apache.mina.filter.logging.LoggingFilter.messageReceived(LoggingFilter.java:178)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:48)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:802)
        at 
org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:120)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at 
org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:426)
        at 
org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:604)
        at 
org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:564)
        at 
org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:553)
        at 
org.apache.mina.core.polling.AbstractPollingIoProcessor.access$400(AbstractPollingIoProcessor.java:57)
        at 
org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:892)
        at 
org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:65)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
[20:08:30] INFO [org.apache.mina.filter.logging.LoggingFilter] - SENT: 
HeapBuffer[pos=0 lim=6 cap=2048: 74 65 73 74 0D 0A]

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to