The ProtocolCodecFilter works on batches instead of stream based. You have to be careful when you use it to ensure that you obtained all of the processed messages. This could explain the dropped messages you are seeing. On Sep 16, 2013 3:20 PM, "Karrys, Michael (IS)" <mike.kar...@ngc.com> wrote:
> Thanks. Knew it was something simple. I added the following and it now > works. > connector.getFilterChain().addLast( > "codec", > new ProtocolCodecFilter( > new ObjectSerializationCodecFactory())); > connector.getFilterChain().addLast("logger", new LoggingFilter()); > > But it drops messages. The SumUp example does not seem to drop messages > when I changed it to use the ObjectSerializationCodecFactory but I will > have to verify that. > > -----Original Message----- > From: Jon V. [mailto:sybersn...@gmail.com] > Sent: Monday, September 16, 2013 12:25 PM > To: users@mina.apache.org > Subject: EXT :Re: Trouble with messageReceived() > > On the client you are writing a hashmap to the session. I don't see a > filter which turns the hashmap into a IoBuffer? Only IoBuffer can be > written to the socket directly. > On Sep 16, 2013 1:12 PM, "Karrys, Michael (IS)" <mike.kar...@ngc.com> > wrote: > > > I was wondering if someone could look at this and tell me what I am > > doing wrong. I don't seem to be getting the messageReceived() called > > on either the client or the server code. I have been able to get sumup > > example to run and I used code from that example to mashup this test > > code. I am rung OS/X with java 7 using mina 2.0.7. The logger shows > > the CREATED, OPENNED, and CLOSED events for the session but the > > messageReceived() routine never seems to be called. Is there something > simple I am missing here? > > > > Thanks, > > Mike Karrys > > mike.kar...@ngc.com > > > > Server Code: > > > > public class Main extends IoHandlerAdapter { > > > > private static final int SERVER_PORT = 8080; > > private final static Logger LOGGER = > > LoggerFactory.getLogger(org.jeuron.test.mina.fl.server.Main.class); > > > > public void init() throws IOException { > > NioSocketAcceptor acceptor = new NioSocketAcceptor(); > > > > // Prepare the service configuration. > > acceptor.getFilterChain().addLast( > > "codec", > > new ProtocolCodecFilter( > > new ObjectSerializationCodecFactory())); > > > > acceptor.getFilterChain().addLast("logger", new > > LoggingFilter()); > > > > acceptor.setHandler(this); > > acceptor.bind(new InetSocketAddress(SERVER_PORT)); > > > > System.out.println("Listening on port " + SERVER_PORT); > > > > } > > > > public Map process(Map record) { > > int count = (Integer) record.get(Field.COUNT); > > int length = (Integer) record.get(Field.LENGTH); > > String input = (String) record.get(Field.CONTENTS); > > StringBuilder output = new StringBuilder(); > > > > System.out.print("Processing(" + (length + 1) + ")\r"); > > for (int i = input.length(); i > 0; i--) { > > output.append(input.charAt(i - 1)); > > } > > record.put(Field.CONTENTS, output.toString()); > > > > if (count == length) { > > System.out.println("\nFinished Processing " + (count + 1)); > > } > > > > return record; > > } > > > > @Override > > public void sessionOpened(IoSession session) { > > LOGGER.info("sessionOpened."); > > session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60); > > } > > > > @Override > > public void messageReceived(IoSession session, Object message) { > > Map input; > > Map output; > > > > LOGGER.info("messageReceived(" + message + ")"); > > if (message instanceof Map) { > > input = (Map) message; > > output = process(input); > > session.write(output); > > } > > } > > > > @Override > > public void sessionIdle(IoSession session, IdleStatus status) { > > LOGGER.info("Disconnecting the idle."); > > session.close(true); > > } > > > > @Override > > public void exceptionCaught(IoSession session, Throwable cause) { > > session.close(true); > > } > > > > public static void main(String[] args) throws Exception { > > Main main = new Main(); > > main.init(); > > } > > } > > > > Client Code: > > > > public class Main extends IoHandlerAdapter { > > > > private final static Logger LOGGER = > > LoggerFactory.getLogger(org.jeuron.test.mina.fl.client.Main.class); > > private static final String HOSTNAME = "localhost"; > > private static final int PORT = 8080; > > private NioSocketConnector connector; > > private IoSession session; > > static char[] letter = {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', > > 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', > > 'W', 'X', 'Y', 'Z'}; > > private long startTime = 0; > > private long endTime = 0; > > private int maxSent = 0; > > private int receivedRecords = 0; > > private long totalBytes = 0; > > private int recordLengthValue = 0; > > private int recordCountValue = 0; > > > > public String readInput(String prompt) throws IOException { > > String input = null; > > > > System.out.println(prompt); > > BufferedReader br = new BufferedReader(new > > InputStreamReader(System.in)); > > input = br.readLine(); > > > > return input; > > } > > > > public boolean init() throws Exception { > > SocketAddress address = new InetSocketAddress(HOSTNAME, PORT); > > LOGGER.info("Main:init() address[" + address + "]"); > > > > connector = new NioSocketConnector(); > > connector.setHandler(this); > > for (;;) { > > try { > > ConnectFuture future = connector.connect(new > > InetSocketAddress(HOSTNAME, PORT)); > > future.awaitUninterruptibly(); > > session = future.getSession(); > > break; > > } catch (RuntimeIoException e) { > > System.err.println("Failed to connect."); > > e.printStackTrace(); > > Thread.sleep(5000); > > } > > } > > > > //session = future1.getSession(); > > > > return true; > > } > > > > @Override > > public void sessionOpened(IoSession session) { > > try { > > LOGGER.info("sessionOpened."); > > this.session = session; > > fixedLength(recordLengthValue, recordCountValue); > > > > } catch (Exception ex) { > > LOGGER.info("sessionOpened.exception(" + ex + ")"); > > } > > } > > > > @Override > > public void messageReceived(IoSession session, Object message) { > > LOGGER.info("Main:messageReceived() message[" + message + "]"); > > //System.out.println("Main:messageReceived() message[" + > > message + "]"); > > > > if (message instanceof Map) { > > Map record = (Map) message; > > totalBytes = totalBytes + (Integer) record.get(Field.LENGTH); > > receivedRecords = receivedRecords + 1; > > > > if (receivedRecords == maxSent) { > > endTime = System.currentTimeMillis(); > > System.out.format("<<<<**** FixedLength( Total > > Records(%d), Total Bytes Read(%,10d), Avg Bytes Read(%d)%nThat took " > > + (endTime - startTime) + " milliseconds\n\n", receivedRecords, > > totalBytes, totalBytes / receivedRecords); > > } else { > > System.out.print("***<<<< FixedLength.received(" + > > receivedRecords + ")\r"); > > } > > } > > } > > > > @Override > > public void exceptionCaught(IoSession session, Throwable cause) { > > session.close(true); > > } > > > > public void fixedLength(int lengthValue, int countValue) throws > > Exception { > > Map record = null; > > > > startTime = System.currentTimeMillis(); > > totalBytes = 0; > > receivedRecords = 0; > > StringBuffer contents = new StringBuffer(); > > maxSent = countValue; > > > > int k = 0; > > for (int i = 0; i < lengthValue; i++) { > > contents = contents.append(letter[k]); > > if (k == 25) { > > k = 0; > > } else { > > k++; > > } > > } > > > > for (int i = 0; i < maxSent; i++) { > > > > record = new HashMap(); > > record.put(Field.LENGTH, Integer.valueOf(i)); > > record.put(Field.COUNT, i); > > record.put(Field.CONTENTS, contents.toString()); > > > > LOGGER.info("Main:fixedLength() Before session.write"); > > session.write(record); > > } > > System.out.println("\n"); > > } > > > > public void process() throws Exception { > > String input = null; > > String[] line = null; > > > > > > try { > > > > input = null; > > input = readInput("\nEnter Record Length: "); > > line = input.split(" "); > > recordLengthValue = Integer.parseInt(line[0]); > > > > input = null; > > input = readInput("\nEnter Record Count: "); > > line = input.split(" "); > > recordCountValue = Integer.parseInt(line[0]); > > > > // fixedLength(recordLengthValue, recordCountValue); > > init(); > > > > } finally { > > connector.dispose(); > > } > > > > } > > > > public static void main(String[] args) throws Exception { > > > > Main main = new Main(); > > // main.init(); > > main.process(); > > > > } > > } > > > > > > >