onMessageSent is invoked when ONE message is sent, not all message is sent.
Anyway I found a approach to make it work, I remembered the number of messages
that's being sent - increase the counter by one before call filterWrite and
decrease it by one when messageReceived is invoked. Then to test whether the
IoSession is ready to write data is just to see if sendingMessages == 0.
============ code begin ================================
package com.banckle.rtmp.util;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.DefaultWriteRequest;
import org.apache.mina.core.write.WriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PrioritizedIoFilter extends IoFilterAdapter {
private static final Logger log =
LoggerFactory.getLogger(PrioritizedIoFilter.class);
private static final Object MESSAGE_QUEUE_KEY = new Object();
private static class MessageQueue {
AtomicInteger sendingMessages = new AtomicInteger(0);
PrioritizedQueue<WriteRequest> queue = new
PrioritizedQueue<WriteRequest>();
public boolean isSending() {
return sendingMessages.get() == 0 && queue.isEmpty();
}
}
private static final MessageQueue getWriteQueue(IoSession session) {
return (MessageQueue) session.getAttribute(MESSAGE_QUEUE_KEY);
}
private void internalWriteMessage(NextFilter nextFilter, IoSession
session, MessageQueue mq, WriteRequest writeRequest) {
mq.sendingMessages.incrementAndGet();
nextFilter.filterWrite(session, writeRequest);
}
@Override
public void filterWrite(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
// the message queue
MessageQueue mq = getWriteQueue(session);
if (!(writeRequest.getMessage() instanceof PrioritizedMessage))
{
// just forward it to next filter
internalWriteMessage(nextFilter, session, mq,
writeRequest);
return;
}
// unpack the message
PrioritizedMessage msg = (PrioritizedMessage)
writeRequest.getMessage();
Object actualMessage = msg.getActualMessage();
int priority = msg.getPriority();
// create the unpacked write request
WriteFuture future = new DefaultWriteFuture(session);
WriteRequest unpackedRequest = new
DefaultWriteRequest(actualMessage, future, writeRequest.getDestination());
// send it directly if there's no message in the queue,
otherwise put it
// into the queue
if (mq.isSending()) {
log.info("Thd({}) Message {} put into prioritized
queue", Thread.currentThread().getName(), actualMessage);
mq.queue.offer(unpackedRequest, priority);
} else {
log.info("Thd({}) Message {} sent directly",
Thread.currentThread().getName(), actualMessage);
internalWriteMessage(nextFilter, session, mq,
unpackedRequest);
}
}
@Override
public void messageSent(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
super.messageSent(nextFilter, session, writeRequest);
// the message queue
MessageQueue mq = getWriteQueue(session);
if (mq.sendingMessages.decrementAndGet() == 0) {
WriteRequest req = mq.queue.poll();
if (req != null) {
internalWriteMessage(nextFilter, session, mq,
req);
}
}
}
@Override
public void sessionCreated(NextFilter nextFilter, IoSession session)
throws Exception {
session.setAttribute(MESSAGE_QUEUE_KEY, new MessageQueue());
super.sessionCreated(nextFilter, session);
}
}
====================== code end =========================
Thanks
--------------------------------------------------
From: "Emmanuel Lcharny" <[email protected]>
Sent: Wednesday, December 16, 2009 4:42 PM
To: <[email protected]>
Subject: Re: How do I test whether a IoSession is ready for write?
> [email protected] a écrit :
>> I want to know whether an IoSession is ready for write: that is, the
>> session's writing queue is empty and the last write request is competed. I
>> want to use IoSession.isWriterIdle but I'm afraid the method returns true
>> even if the queue is not empty, just there's no messaging being sent. Am I
>> correct? If not what could be the proper method to invoke?
>>
> I'm not 100% sure, but isn't the messageSent event propagated only when
> the message has been fully written?
>
> Otherwise, give me a couple of hours to check the code and give you an
> answer.
>> Best regards and many thanks
>> Wei
>
>