Hi all,

I could use a little help with an IoFilter I've written. The
application I'm working on can't use TCP/IP but requires an ARQ
mechanism anyway, which I've built on top of UDP/IP. I've written an
IoFilter to take care of this. It consists of a few components:

1. a Packet class that wraps my payload data with some header information
2. a ProtocolCodec to convert bewteen Packet and IoBuffer
3. an ArqFilter to do the bookkeeping:
3a. on filterWrite(), schedule a task to re-send the Packet if it's
not ACK'ed in time.
3b. on messageReceived(), retrieve ACK information and cancel the
timer for the acknowledged Packet.
3c. on timer task, resend the Packet.

The scary bit happens in 3c, where a new invocation of
nextFilter.filterWrite() originates from a timer task halfway the
filter chain, instead of at the top in IoSession#write(Object) from
the calling thread.

The filter chain looks like this:

(application end)
1.  ExecutorFilter (EventType.MESSAGE_RECEIVED) - multiple threads
2.  ExecutorFilter (EventType.WRITE) - one thread
3.  ArqFilter (includes a ScheduledExecutorService)
4.  ProtocolCodecFilter
(network end)

All the individual components work well in unit test, but after
assembling the whole thing, there was an unpleasant surprise: Ethereal
revealed that only the first of three tries for every Packet actually
makes it to the network.

I'm using MINA 2.0-M1 for the moment, but I've confirmed the same
thing happens using 2.0-M2.

I've put some additional logging statements into the code, including a
conditional breakpoint on
sun.nio.ch.DatagramChannelImpl#write(ByteBuffer) with a breakpoint
condition of "System.err.println("DatagramChannel write");return
false;" to reveal invocation of the write() method on the underlying
nio Channel (sort of a poor man's AOP, but a useful debugger trick
nonetheless).

For reference, the relevant bits of source are quoted at the end of
the mail, let me first quote the log output:

>    7671ms TRACE [UDP-writer-1] ArqFilter - trigger send of packet, try 1.
>    7671ms TRACE [UDP-writer-1] ArqFilter - before: sched/written: 0/0
>    7702ms TRACE [UDP-writer-1] ArqFilter - after : sched/written: 1/0
>DatagramChannel write
>
>   22701ms TRACE [pool-1-thread-1] ArqFilter - trigger send of packet, try 2.
>   22701ms TRACE [pool-1-thread-1] ArqFilter - before: sched/written: 0/1
>   22701ms TRACE [pool-1-thread-1] ArqFilter - after : sched/written: 1/1
>
>   37716ms TRACE [pool-1-thread-1] ArqFilter - trigger send of packet, try 3.
>   37716ms TRACE [pool-1-thread-1] ArqFilter - before: sched/written: 1/1
>   37716ms TRACE [pool-1-thread-1] ArqFilter - after : sched/written: 2/1

My retry packages get scheduled, but never written. Stepping through
the code eventually leads me to AbstractPollingIoProcessor#flush(),
where the IoProcessor doesn't get woken up because scheduleFlush()
returns false. My question, of course, is how to get the IoProcessor
to wake up and do something.

The scheduleFlush() call returns false because
AbstractPollingIoSession#setScheduledForFlush(true) does. Apparently,
the AbstractIoSession#scheduledForFlush boolean is still set to true
fifteen and thirty seconds after the first packet was sent. This
suggests I'm somehow keeping it from being cleared. Can anyone help me
here? How do I get this flag to clear?

Source code folows.


Thanks for your time,
Barend




// My IoFilter, abridged

class ArqFilter extends IoFilterAdapter {

   private int TIMEOUT = 15; // configured values
   private int MAX_TRIES = 3;
   private ScheduledExcecutorService scheduler; // initialized in constructor


 @Override
 public void filterWrite(final NextFilter next, final IoSession
session, WriteRequest writeRequest) {

    final Packet packet = (Packet)writeRequest.getMessage();

    scheduler.schedule( /*new Runnable.run() */ {
       if (packet.status() != Status.DELIVERED && packet.tries() < MAX_TRIES) {
           writeToNextFilter(packet, session, next);
       }
    }, TIMEOUT, TimeUnit.SECONDS);

    // not shown: bookkeeping so that messageReceived() can cancel
timers and assign Status.DELIVERED as ACK's come in.

    writeToNextFilter(packet, session, next);
 }



  private static void writeToNextFilter(Packet packet, IoSession
session, NextFilter nextFilter) {
    packet.incrementTries();

    logger.trace("trigger send of packet, try "+packet.tries()+".");
    logger.trace("before: sched/written: " +
session.getScheduledWriteMessages() + "/" +
session.getWrittenMessages());

    nextFilter.filterWrite( session, new DefaultWriteRequest( packet ) );

    logger.trace("after : sched/written: " +
session.getScheduledWriteMessages() + "/" +
session.getWrittenMessages());
  }
}

Reply via email to