[ 
https://issues.apache.org/jira/browse/DIRMINA-1057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195937#comment-16195937
 ] 

Jonathan Valliere edited comment on DIRMINA-1057 at 10/7/17 11:24 PM:
----------------------------------------------------------------------

Okay, sorry for all the edits of the previous comment.

Here is the deal.  
* {{org.apache.mina.filter.codec.ProtocolCodecFilter}} produces two 
WriteRequest objects for every 1 input.
* {{DefaultIoFilterChain$HeadFilter#filterWrite}} only called 
{{increaseScheduledWriteMessages}} when {{getMessage()}} was not an instanceof 
{{IoBuffer}} which causes other problems because both File and IoBuffer 
requests are transferred down the {{fireMessageSent}} chain and cause the count 
to go negative also.
* {{DefaultIoFilterChain#fireMessageSent}} discriminates against Encoded 
requests and will not allow them down the chain.
* {{AbstractPollingIoProcessor#writeBuffer}} contains a couple of errors when 
dealing with empty write requests or null checks for the existance of an 
OriginalRequest.

h3. *Fix HeadFilter*

{code:java}
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest 
writeRequest) throws Exception
                {
                        AbstractIoSession s = (AbstractIoSession) session;

                        // Maintain counters.
                        if (writeRequest.getMessage() instanceof IoBuffer)
                        {
                                IoBuffer buffer = (IoBuffer) 
writeRequest.getMessage();
                                // I/O processor implementation will call 
buffer.reset()
                                // it after the write operation is finished, 
because
                                // the buffer will be specified with 
messageSent event.
                                buffer.mark();
                                int remaining = buffer.remaining();

                                if (remaining > 0)
                                {
                                        
s.increaseScheduledWriteBytes(remaining);
                                }
                        }

                        if (!writeRequest.isEncoded())
                        {
                                s.increaseScheduledWriteMessages();
                        }

                        WriteRequestQueue writeRequestQueue = 
s.getWriteRequestQueue();

                        if (!s.isWriteSuspended())
                        {
                                if (writeRequestQueue.isEmpty(session))
                                {
                                        // We can write directly the message
                                        s.getProcessor().write(s, writeRequest);
                                }
                                else
                                {
                                        s.getWriteRequestQueue().offer(s, 
writeRequest);
                                        s.getProcessor().flush(s);
                                }
                        }
                        else
                        {
                                s.getWriteRequestQueue().offer(s, writeRequest);
                        }
                }
{code}

h3. *Fix AbstractPollingIoProcessor*

{code:java}
private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, 
int maxLength, long currentTime) throws Exception
        {
                IoBuffer buf = (IoBuffer) req.getMessage();
                int localWrittenBytes = 0;

                if (buf.hasRemaining())
                {
                        int length;

                        if (hasFragmentation)
                        {
                                length = Math.min(buf.remaining(), maxLength);
                        }
                        else
                        {
                                length = buf.remaining();
                        }

                        try
                        {
                                localWrittenBytes = this.write(session, buf, 
length);
                        }
                        catch (IOException ioe)
                        {
                                ioe.printStackTrace();

                                // We have had an issue while trying to send 
data to the
                                // peer : let's close the session.
                                buf.free();
                                session.closeNow();
                                this.removeNow(session);

                                return 0;
                        }

                        session.increaseWrittenBytes(localWrittenBytes, 
currentTime);

                        // Now, forward the original message
                        if (!buf.hasRemaining() || (!hasFragmentation && 
(localWrittenBytes != 0)))
                        {
                                WriteRequest originalRequest = 
req.getOriginalRequest();

                                if (originalRequest != null)
                                {
                                        Object originalMessage = 
req.getOriginalRequest().getMessage();

                                        if (originalMessage instanceof IoBuffer)
                                        {
                                                buf = ((IoBuffer) 
req.getOriginalRequest().getMessage());

                                                int pos = buf.position();
                                                buf.reset();
                                                this.fireMessageSent(session, 
req);
                                                // And set it back to its 
position
                                                buf.position(pos);
                                        }
                                        else
                                        {
                                                this.fireMessageSent(session, 
req);
                                        }
                                }
                                else
                                {
                                        this.fireMessageSent(session, req);
                                }
                        }
                }
                else
                {
                        this.fireMessageSent(session, req);
                }

                return localWrittenBytes;
        }
{code}




was (Author: johnnyv):
Okay, sorry for all the edits of the previous comment.

Here is the deal.  
* {{org.apache.mina.filter.codec.ProtocolCodecFilter}} produces two 
WriteRequest objects for every 1 input.
* {{DefaultIoFilterChain$HeadFilter#filterWrite}} only called 
{{increaseScheduledWriteMessages}} when {{getMessage()}} was not an instanceof 
{{IoBuffer}} which causes other problems because File requests are transferred 
down the {{fireMessageSent}} chain and cause the count to go negative also.
* {{DefaultIoFilterChain#fireMessageSent}} discriminates against Encoded 
requests and will not allow them down the chain.
* {{AbstractPollingIoProcessor#writeBuffer}} contains a couple of errors when 
dealing with empty write requests or null checks for the existance of an 
OriginalRequest.

h3. *Fix HeadFilter*

{code:java}
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest 
writeRequest) throws Exception
                {
                        AbstractIoSession s = (AbstractIoSession) session;

                        // Maintain counters.
                        if (writeRequest.getMessage() instanceof IoBuffer)
                        {
                                IoBuffer buffer = (IoBuffer) 
writeRequest.getMessage();
                                // I/O processor implementation will call 
buffer.reset()
                                // it after the write operation is finished, 
because
                                // the buffer will be specified with 
messageSent event.
                                buffer.mark();
                                int remaining = buffer.remaining();

                                if (remaining > 0)
                                {
                                        
s.increaseScheduledWriteBytes(remaining);
                                }
                        }

                        if (!writeRequest.isEncoded())
                        {
                                s.increaseScheduledWriteMessages();
                        }

                        WriteRequestQueue writeRequestQueue = 
s.getWriteRequestQueue();

                        if (!s.isWriteSuspended())
                        {
                                if (writeRequestQueue.isEmpty(session))
                                {
                                        // We can write directly the message
                                        s.getProcessor().write(s, writeRequest);
                                }
                                else
                                {
                                        s.getWriteRequestQueue().offer(s, 
writeRequest);
                                        s.getProcessor().flush(s);
                                }
                        }
                        else
                        {
                                s.getWriteRequestQueue().offer(s, writeRequest);
                        }
                }
{code}

h3. *Fix AbstractPollingIoProcessor*

{code:java}
private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, 
int maxLength, long currentTime) throws Exception
        {
                IoBuffer buf = (IoBuffer) req.getMessage();
                int localWrittenBytes = 0;

                if (buf.hasRemaining())
                {
                        int length;

                        if (hasFragmentation)
                        {
                                length = Math.min(buf.remaining(), maxLength);
                        }
                        else
                        {
                                length = buf.remaining();
                        }

                        try
                        {
                                localWrittenBytes = this.write(session, buf, 
length);
                        }
                        catch (IOException ioe)
                        {
                                ioe.printStackTrace();

                                // We have had an issue while trying to send 
data to the
                                // peer : let's close the session.
                                buf.free();
                                session.closeNow();
                                this.removeNow(session);

                                return 0;
                        }

                        session.increaseWrittenBytes(localWrittenBytes, 
currentTime);

                        // Now, forward the original message
                        if (!buf.hasRemaining() || (!hasFragmentation && 
(localWrittenBytes != 0)))
                        {
                                WriteRequest originalRequest = 
req.getOriginalRequest();

                                if (originalRequest != null)
                                {
                                        Object originalMessage = 
req.getOriginalRequest().getMessage();

                                        if (originalMessage instanceof IoBuffer)
                                        {
                                                buf = ((IoBuffer) 
req.getOriginalRequest().getMessage());

                                                int pos = buf.position();
                                                buf.reset();
                                                this.fireMessageSent(session, 
req);
                                                // And set it back to its 
position
                                                buf.position(pos);
                                        }
                                        else
                                        {
                                                this.fireMessageSent(session, 
req);
                                        }
                                }
                                else
                                {
                                        this.fireMessageSent(session, req);
                                }
                        }
                }
                else
                {
                        this.fireMessageSent(session, req);
                }

                return localWrittenBytes;
        }
{code}



> AbstractIoSession getScheduledWriteMessages always -negative?
> -------------------------------------------------------------
>
>                 Key: DIRMINA-1057
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-1057
>             Project: MINA
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 2.0.16
>         Environment: I'm testing slow consumer backlog detection and while 
> getScheduledWriteBytes() correctly grows, getScheduledWriteMessages is always 
> negative and does not increase. looking into code to see why but putting bug 
> report here as well for tracking
>            Reporter: Andre Mermegas
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to