I've been able to correct the issues I had... MINA sure needs more
documentation ;-)
These classes demonstrate how to bypass a filter based on session state
and sending/receiving files. Not elegant, but I should be able to go back
to my project now and finish it. Thanks for all the help!
Client.java
public class Client {
private final static Logger LOGGER =
LoggerFactory.getLogger(Client.class);
// private static final int BUFFER_SIZE = 33554432; //32 MB
public static final int BUFFER_SIZE = 16777216; //16 MB
// private static final int BUFFER_SIZE = 8388608; //8 MB
// private static final int BUFFER_SIZE = 4194304; //4 MB
private String hostname = "127.0.0.1";
private int port = 8877;
private File filename;
private static final long CONNECT_TIMEOUT = 15 * 1000L;
private static final int CONNECT_RETRIES = 3;
private IoSession session;
private int cmd = 0;
public Client(String hostname, int port, File infile) {
this.hostname = hostname;
if (port < 1024 || port > 65535) {
LOGGER.error("Invalid port number: " + port);
this.port = 8877;
} else
this.port = port;
this.filename = infile;
}
public void connect() throws InterruptedException {
NioSocketConnector connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(CONNECT_TIMEOUT);
//Let's keep events from firing at the wrong time or simultaneously
connector.getFilterChain().addLast("executor", new
ExecutorFilter());
//Out logging filter
connector.getFilterChain().addLast("logger", new LoggingFilter());
//Initial attributes filter
SessionAttributeInitializingFilter saif = new
SessionAttributeInitializingFilter();
saif.setAttribute("state", SessionState.SEND_FILE);
connector.getFilterChain().addLast("init", saif);
//INFO: Moved file filter after protocol codec
CodecFactory cf = new CodecFactory();
cf.setDecoderMaxObjectSize(BUFFER_SIZE);
connector.getFilterChain().addLast("codec", new CodecFilter(cf));
//INFO: Updated buffer to 8MB
StreamWriteFilter swf = new StreamWriteFilter();
swf.setWriteBufferSize(BUFFER_SIZE);
connector.getFilterChain().addLast("file", swf);
// ObjectSerializationCodecFactory oscf = new
ObjectSerializationCodecFactory();
// oscf.setDecoderMaxObjectSize(BUFFER_SIZE);
// connector.getFilterChain().addLast("codec", new
ProtocolCodecFilter(oscf));
connector.setHandler(new ClientIoSessionHandler(this));
//
connector.getSessionConfig().setMaxReadBufferSize(BUFFER_SIZE);
//
connector.getSessionConfig().setReadBufferSize(BUFFER_SIZE);
for (int i = 0; i < CONNECT_RETRIES; i++) {
try {
ConnectFuture future = connector.connect(new
InetSocketAddress(this.hostname, this.port));
future.awaitUninterruptibly();
session = future.getSession();
break;
} catch (RuntimeIoException e) {
LOGGER.error("Failed to connect.", e);
Thread.sleep(5000);
}
}
if (session != null) {
session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 600);
// session.setAttribute(HOME_DIR, homedir);
session.getCloseFuture().awaitUninterruptibly();
}
connector.dispose();
}
public static void main(String[] args) {
if (args.length != 4) {
System.out.println("Usage: java client hostname port [up/down]
filename");
System.exit(1);
}
Client client = new Client(args[0], Integer.parseInt(args[1]), new
File(args[3]));
if (args[2].equalsIgnoreCase("up"))
client.cmd = 1;
else
client.cmd = 2;
try {
client.connect();
} catch (InterruptedException e) {
LOGGER.error("Interrupted Exception: ", e);
}
int i = 0;
while (client.session == null && i < 15) {
try {
System.out.println("Waiting for session...");
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Couldn't go to sleep...");
}
i++;
}
}
public int getCmd() {
return cmd;
}
public File getFilename() {
return filename;
}
}
------------------------------
ClientIOSessionHandler.java
public class ClientIoSessionHandler extends IoHandlerAdapter {
private final static Logger LOGGER =
LoggerFactory.getLogger(ClientIoSessionHandler.class);
public static final String SESSION_NONCE = "nonce";
public static final String SESSION_STATE = "state";
public static final String CLIENT_CODE = "client_code";
public static final String CLIENT_NAME = "client_name";
public static final String FILEDIR_TREE = "filedir_tree";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String CURRENT_FILE = "current_file";
public static final String FILE_STREAM = "file_stream";
public static final String FILE_OUT = "file_out";
public static final String MSG_COUNTER = "message_counter";
public static final String HOME_DIR = "home_dir";
public static final String NEW_ATTRIBUTE = "new_attribute";
public static final String FILE_REGION = "file_region";
public static final String FILE_POSITION = "file_position";
public static final String FILE_SIZE = "file_size";
private final Client parent;
public ClientIoSessionHandler(Client parent) {
this.parent = parent;
}
@Override
public void sessionOpened(IoSession session) {
String method = "sessionOpened";
session.setAttribute(MSG_COUNTER, 0L);
//Set our home directory to something useful
session.setAttribute(HOME_DIR, new File("/"));
if (parent.getCmd() == 1) {
session.setAttribute(SESSION_STATE, SessionState.SEND_FILE);
if (parent.getFilename() != null &&
parent.getFilename().exists()) {
session.setAttribute(CURRENT_FILE, parent.getFilename());
FileInputStream fsend = null;
try {
fsend = new
FileInputStream((File)session.getAttribute(CURRENT_FILE));
DefaultFileRegion dfr = new
DefaultFileRegion(fsend.getChannel());
session.setAttribute(FILE_STREAM, fsend);
session.setAttribute(FILE_REGION, dfr);
IoBuffer tbuff = (new
SimpleBufferAllocator()).allocate(8, false);
tbuff.putLong(dfr.getRemainingBytes());
tbuff.flip();
session.write(tbuff);
} catch (FileNotFoundException e) {
LOGGER.error("File not found exception. Unable to send
file.");
session.close(true);
} catch (IOException e) {
LOGGER.error("IOException in FileRegion. Unable to
send file.");
session.close(true);
}
} else {
LOGGER.error("File not found. Unable to send file.");
session.close(true);
}
} else if (parent.getCmd() == 2) {
session.setAttribute(SESSION_STATE, SessionState.RECV_FILE);
session.setAttribute(CURRENT_FILE, parent.getFilename());
session.setAttribute(FILE_POSITION, 0L);
} else {
LOGGER.error("Invalid parent command");
session.close(true);
}
LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) +
" Method: " + method + " State: " +
session.getAttribute(SESSION_STATE));
}
@Override
public void messageReceived(IoSession session, Object message) {
String method = "messageReceived";
//TODO: Add appropriate code to deal with server busy states
if (session.getAttribute(SESSION_STATE) == SessionState.RECV_FILE)
{
File myfile = (File)session.getAttribute(CURRENT_FILE);
if (message instanceof IoBuffer) {
loggerHelper(session, method, message);
IoBuffer fbuff = (IoBuffer)message;
if (!session.containsAttribute(FILE_SIZE)) {
long fsize = fbuff.getLong();
session.setAttribute(FILE_SIZE, (Long)fsize);
}
InputStream frecv = fbuff.asInputStream();
FileOutputStream fout;
// LOGGER.info("Bytes remaining: " +
frecv.getRemainingBytes() + "Filename: ", frecv.getFilename());
long position = 0L;
try {
if (session.containsAttribute(FILE_OUT)) {
fout =
(FileOutputStream)session.getAttribute(FILE_OUT);
} else {
fout = new FileOutputStream(new
File(((File)session.getAttribute(HOME_DIR)).getPath().concat(File.separator
+
myfile.getPath())), false);
session.setAttribute(FILE_OUT, fout);
}
if (session.containsAttribute(FILE_POSITION)) {
position =
(Long)session.getAttribute(FILE_POSITION);
} else {
position = 0L;
}
//TODO: Very slow reading/writing - Fix with ByteArray
reading/writing instead
/*
int i = 0; //TODO: We are closing the stream too fast.
Fix
while ((i = frecv.read()) != -1) {
fout.write(i);
position++;
}
*/
position = position + frecv.available();
byte[] readAll = new byte[frecv.available()];
frecv.read(readAll);
fout.write(readAll);
frecv.close();
session.setAttribute(FILE_POSITION, position);
long tsize = (Long)session.getAttribute(FILE_SIZE);
// if
((Long)session.getAttribute(FILE_SIZE) ==
(Long)session.getAttribute(FILE_POSITION)) {
if (tsize == position) {
fout.close();
session.close(false);
}
} catch (FileNotFoundException e) {
LOGGER.error("Couldn't write to file: " +
myfile.getName() + " Exception: " + e.getMessage());
session.removeAttribute(CURRENT_FILE);
session.removeAttribute(FILE_OUT);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
} catch (IOException e) {
LOGGER.error("Couldn't write to file: " +
myfile.getName() + " Exception: " + e.getMessage());
session.removeAttribute(CURRENT_FILE);
session.removeAttribute(FILE_OUT);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
}
} else {
LOGGER.error("In RECV_FILE state and no IoBuffer
received");
loggerHelper(session, method, message);
session.close(false);
}
} else if (session.getAttribute(SESSION_STATE) ==
SessionState.SEND_FILE) {
LOGGER.error("In SEND_FILE state, so we should not receive a
message");
loggerHelper(session, method, message);
session.close(false);
} else if (session.getAttribute(SESSION_STATE) ==
SessionState.IDLE) {
LOGGER.error("In IDLE state, so we should not receive a
message");
loggerHelper(session, method, message);
session.close(false);
}
//This should never happen
else {
LOGGER.error("Received a message with no valid STATE");
loggerHelper(session, method, message);
session.close(false);
}
}
@Override
public void messageSent(IoSession session, Object message) {
String method = "messageSent";
//Streams are not closed by the file filter automatically, so we
must close them here
if(session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE
&& session.containsAttribute(FILE_REGION)) {
loggerHelper(session, method, message);
session.write(session.getAttribute(FILE_REGION));
session.removeAttribute(FILE_REGION);
}
else if(session.getAttribute(SESSION_STATE) ==
SessionState.SEND_FILE && session.containsAttribute(FILE_STREAM)) {
loggerHelper(session, method, message);
FileInputStream fsend;
fsend = (FileInputStream) session.getAttribute(FILE_STREAM);
try {
fsend.close();
LOGGER.info("Closed Stream");
} catch (IOException e) {
LOGGER.error("Unable to close stream: " + e.getMessage());
}
session.removeAttribute(FILE_STREAM);
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
else {
loggerHelper(session, method, message);
}
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
LOGGER.info("Session idle... disconnecting.");
session.close(true);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
//Let's close the connection since we can't recover
// LOGGER.error("Unrecoverable Exception on Session: " +
session.getAttribute(CLIENT_NAME), cause);
LOGGER.error("Unrecoverable Exception on Session: " +
session.getId() + " Exception: " +
cause.getClass().toString());
LOGGER.error("Stack: " + cause.getMessage());
LOGGER.error("Throwable", cause);
session.close(true);
}
private void loggerHelper(IoSession session, String method, Object
message) {
session.setAttribute(MSG_COUNTER,
(Long)session.getAttribute(MSG_COUNTER) + 1);
LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) +
" Method: " + method + " State: " +
session.getAttribute(SESSION_STATE) + " Message Class:
" + message.getClass().toString());
}
}
-------------------
Server.java
public class Server {
public Server() {
super();
}
private final static Logger LOGGER =
LoggerFactory.getLogger(Server.class);
// private static final int BUFFER_SIZE = 33554432; //32 MB
public static final int BUFFER_SIZE = 16777216; //16 MB
// private static final int BUFFER_SIZE = 8388608; //8 MB
// private static final int BUFFER_SIZE = 4194304; //4 MB
//Default Port and Address
private int SERVER_PORT = 8877;
private String SERVER_ADDRESS = "127.0.0.1";
private File filename = null;;
private int cmd = 0;
public void start() throws InterruptedException {
NioSocketAcceptor acceptor = new NioSocketAcceptor();
//Let's keep events from firing at the wrong time or simultaneously
acceptor.getFilterChain().addLast("executor", new
ExecutorFilter());
//We want to instert a logging filter
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
//Initial attributes filter
SessionAttributeInitializingFilter saif = new
SessionAttributeInitializingFilter();
saif.setAttribute("state", SessionState.SEND_FILE);
acceptor.getFilterChain().addLast("init", saif);
//INFO: Moved file filter after protocol codec
//Let's use object codec serialization until we develop our own
codec
//INFO: Moved file filter after protocol codec
CodecFactory cf = new CodecFactory();
cf.setDecoderMaxObjectSize(BUFFER_SIZE);
acceptor.getFilterChain().addLast("codec", new CodecFilter(cf));
//Let's add an filter to deal with File I/O
//INFO: Updated buffer to 8MB
StreamWriteFilter swf = new StreamWriteFilter();
swf.setWriteBufferSize(BUFFER_SIZE);
acceptor.getFilterChain().addLast("file", swf);
// ObjectSerializationCodecFactory oscf = new
ObjectSerializationCodecFactory();
// oscf.setDecoderMaxObjectSize(BUFFER_SIZE);
// acceptor.getFilterChain().addLast("codec", new
ProtocolCodecFilter(oscf));
//Let's set our main IoSession Handler here
acceptor.setHandler(new ServerIoSessionHandler(this));
//
acceptor.getSessionConfig().setMaxReadBufferSize(BUFFER_SIZE);
//
acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE);
//Let's bind to our address and port number. Give it 5 tries...
for (int i=0; i<5; i++)
try {
acceptor.bind(new InetSocketAddress(SERVER_ADDRESS,
SERVER_PORT));
break;
} catch (IOException ioe) {
// TODO: Add catch code
LOGGER.error("Server was unable to bind to port: " +
SERVER_PORT + " Error: " + ioe.getMessage());
Thread.sleep(1000);
}
if (acceptor.isActive())
//We are done
LOGGER.info("Server started on address and port: " +
SERVER_ADDRESS + ":" + SERVER_PORT);
else {
LOGGER.info("Server could not start");
System.exit(1);
}
}
public static void main(String[] args) {
Server server = new Server();
//Did we get non-default host and port arguments
if (args.length == 4) {
//Assign the port number
try {
server.SERVER_PORT = Integer.parseInt(args[1]);
} catch (NumberFormatException e) {
LOGGER.error(e.getMessage());
}
//Only allow ports in the user range
if (server.SERVER_PORT < 1024 || server.SERVER_PORT > 65535) {
LOGGER.error("Invalid port number: " + server.SERVER_PORT);
System.exit(1);
}
//TODO: Validate the host address
server.SERVER_ADDRESS = args[0];
if (args[2].equalsIgnoreCase("up"))
server.cmd = 1;
else
server.cmd = 2;
try {
server.start();
} catch (InterruptedException e) {
LOGGER.error("Interrupted Exception: ", e);
}
server.filename = new File(args[3]);
}
else {
System.out.println("Usage: java server hostname port [up/down]
filename");
System.exit(1);
}
}
public int getCmd() {
return cmd;
}
public File getFilename() {
return filename;
}
}
--------------------------
ServerIoSessionHandler.java
public class ServerIoSessionHandler extends IoHandlerAdapter {
private final static Logger LOGGER =
LoggerFactory.getLogger(ServerIoSessionHandler.class);
public static final String SESSION_NONCE = "nonce";
public static final String SESSION_STATE = "state";
public static final String CLIENT_CODE = "client_code";
public static final String CLIENT_NAME = "client_name";
public static final String FILEDIR_TREE = "filedir_tree";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String CURRENT_FILE = "current_file";
public static final String FILE_STREAM = "file_stream";
public static final String FILE_OUT = "file_out";
public static final String MSG_COUNTER = "message_counter";
public static final String HOME_DIR = "home_dir";
public static final String NEW_ATTRIBUTE = "new_attribute";
public static final String FILE_REGION = "file_region";
public static final String FILE_POSITION = "file_position";
public static final String FILE_SIZE = "file_size";
private final Server parent;
public ServerIoSessionHandler(Server parent) {
this.parent = parent;
}
@Override
public void sessionOpened(IoSession session) {
String method = "sessionOpened";
session.setAttribute(MSG_COUNTER, 0L);
//Set our home directory to something useful
session.setAttribute(HOME_DIR, new File("/"));
//Set our idle time to 600 seconds
session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 600);
// //Initialize our state
// session.setAttribute(SESSION_STATE, SessionState.WAIT_INIT);
//
if (parent.getCmd() == 1) {
session.setAttribute(SESSION_STATE, SessionState.SEND_FILE);
if (parent.getFilename() != null &&
parent.getFilename().exists()) {
session.setAttribute(CURRENT_FILE, parent.getFilename());
FileInputStream fsend = null;
try {
fsend = new
FileInputStream((File)session.getAttribute(CURRENT_FILE));
DefaultFileRegion dfr = new
DefaultFileRegion(fsend.getChannel());
session.setAttribute(FILE_STREAM, fsend);
session.setAttribute(FILE_REGION, dfr);
IoBuffer tbuff = (new
SimpleBufferAllocator()).allocate(8, false);
tbuff.putLong(dfr.getRemainingBytes());
tbuff.flip();
session.write(tbuff);
} catch (FileNotFoundException e) {
LOGGER.error("File not found exception. Unable to send
file.");
session.close(true);
} catch (IOException e) {
LOGGER.error("IOException in FileRegion. Unable to
send file.");
session.close(true);
}
}
else {
LOGGER.error("File not found. Unable to send file.");
session.close(true);
}
}
else if (parent.getCmd() == 2) {
session.setAttribute(SESSION_STATE, SessionState.RECV_FILE);
session.setAttribute(CURRENT_FILE, parent.getFilename());
session.setAttribute(FILE_POSITION, 0L);
}
else {
LOGGER.error("Invalid parent command");
session.close(true);
}
LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) +
" Method: " + method + " State: " + session.getAttribute(SESSION_STATE));
}
@Override
public void messageReceived(IoSession session, Object message) {
String method = "messageReceived";
//TODO: Add appropriate code to deal with server busy states
if (session.getAttribute(SESSION_STATE) == SessionState.RECV_FILE)
{
File myfile = (File)session.getAttribute(CURRENT_FILE);
if (message instanceof IoBuffer) {
loggerHelper(session, method, message);
IoBuffer fbuff = (IoBuffer)message;
if (!session.containsAttribute(FILE_SIZE)) {
long fsize = fbuff.getLong();
session.setAttribute(FILE_SIZE, (Long)fsize);
}
InputStream frecv = fbuff.asInputStream();
FileOutputStream fout;
// LOGGER.info("Bytes remaining: " +
frecv.getRemainingBytes() + "Filename: ", frecv.getFilename());
long position = 0L;
try {
if (session.containsAttribute(FILE_OUT)) {
fout =
(FileOutputStream)session.getAttribute(FILE_OUT);
} else {
fout = new FileOutputStream(new
File(((File)session.getAttribute(HOME_DIR)).getPath().concat(File.separator
+
myfile.getPath())), false);
session.setAttribute(FILE_OUT, fout);
}
if (session.containsAttribute(FILE_POSITION)) {
position =
(Long)session.getAttribute(FILE_POSITION);
} else {
position = 0L;
}
int i = 0; //TODO: We are closing the stream too fast.
Fix
//TODO: Very slow reading/writing - Fix with ByteArray
reading/writing instead
/*
int i = 0; //TODO: We are closing the stream too fast.
Fix
while ((i = frecv.read()) != -1) {
fout.write(i);
position++;
}
*/
position = position + frecv.available();
byte[] readAll = new byte[frecv.available()];
frecv.read(readAll);
fout.write(readAll);
frecv.close();
session.setAttribute(FILE_POSITION, position);
long tsize = (Long)session.getAttribute(FILE_SIZE);
// if
((Long)session.getAttribute(FILE_SIZE) ==
(Long)session.getAttribute(FILE_POSITION)) {
if (tsize == position) {
fout.close();
session.close(false);
}
} catch (FileNotFoundException e) {
LOGGER.error("Couldn't write to file: " +
myfile.getName() + " Exception: " + e.getMessage());
session.removeAttribute(CURRENT_FILE);
session.removeAttribute(FILE_OUT);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
} catch (IOException e) {
LOGGER.error("Couldn't write to file: " +
myfile.getName() + " Exception: " + e.getMessage());
session.removeAttribute(CURRENT_FILE);
session.removeAttribute(FILE_OUT);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
}
} else {
LOGGER.error("In RECV_FILE state and no IoBuffer
received");
loggerHelper(session, method, message);
session.close(false);
}
} else if (session.getAttribute(SESSION_STATE) ==
SessionState.SEND_FILE) {
LOGGER.error("In SEND_FILE state, so we should not receive a
message");
loggerHelper(session, method, message);
session.close(false);
} else if (session.getAttribute(SESSION_STATE) ==
SessionState.IDLE) {
LOGGER.error("In IDLE state, so we should not receive a
message");
loggerHelper(session, method, message);
session.close(false);
}
//This should never happen
else {
LOGGER.error("Received a message with no valid STATE");
loggerHelper(session, method, message);
session.close(false);
}
}
@Override
public void messageSent(IoSession session, Object message) {
String method = "messageSent";
//Streams are not closed by the file filter automatically, so we
must close them here
if(session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE
&& session.containsAttribute(FILE_REGION)) {
loggerHelper(session, method, message);
session.write(session.getAttribute(FILE_REGION));
session.removeAttribute(FILE_REGION);
}
else if(session.getAttribute(SESSION_STATE) ==
SessionState.SEND_FILE && session.containsAttribute(FILE_STREAM)) {
loggerHelper(session, method, message);
FileInputStream fsend;
fsend = (FileInputStream) session.getAttribute(FILE_STREAM);
try {
fsend.close();
LOGGER.info("Closed Stream");
} catch (IOException e) {
LOGGER.error("Unable to close stream: " + e.getMessage());
}
session.removeAttribute(FILE_STREAM);
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
else {
loggerHelper(session, method, message);
}
}
//Have we been idel beyond our timeout? Then disconnect
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
LOGGER.info("Session idle... disconnecting.");
session.close(true);
}
//Log any unrecoverable exceptions and close the connection
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
//If this is a NotSerializableException we should continue and
hope for the best ;-)
if (cause.getClass() == ProtocolDecoderException.class ||
cause.getClass() == ProtocolEncoderException.class)
LOGGER.warn("Exception: " + cause.getClass().toString() + "
State: " + session.getAttribute(SESSION_STATE));
else {
//Let's close the connection since we can't recover
// LOGGER.error("Unrecoverable Exception on Session: " +
session.getAttribute(CLIENT_NAME), cause);
LOGGER.error("Unrecoverable Exception on Session: " +
session.getAttribute(CLIENT_NAME) + " Exception: " +
cause.getClass().toString());
session.close(true);
}
}
private void loggerHelper(IoSession session, String method, Object
message) {
session.setAttribute(MSG_COUNTER,
(Long)session.getAttribute(MSG_COUNTER) + 1);
LOGGER.info("MSG Counter: " +
session.getAttribute(MSG_COUNTER) + " Method: " + method + " State: " +
session.getAttribute(SESSION_STATE) + " Message Class: " +
message.getClass().toString());
}
}
-----------------------
CodecFactory.java
public class CodecFactory extends ObjectSerializationCodecFactory {
private final CodecEncoder encoder;
private final CodecDecoder decoder;
public CodecFactory() {
this(Thread.currentThread().getContextClassLoader());
}
public CodecFactory(ClassLoader classLoader) {
encoder = new CodecEncoder();
decoder = new CodecDecoder(classLoader);
}
@Override
public ProtocolEncoder getEncoder(IoSession session) {
return encoder;
}
@Override
public ProtocolDecoder getDecoder(IoSession session) {
return decoder;
}
@Override
public int getEncoderMaxObjectSize() {
return encoder.getMaxObjectSize();
}
@Override
public void setEncoderMaxObjectSize(int maxObjectSize) {
encoder.setMaxObjectSize(maxObjectSize);
}
@Override
public int getDecoderMaxObjectSize() {
return decoder.getMaxObjectSize();
}
@Override
public void setDecoderMaxObjectSize(int maxObjectSize) {
decoder.setMaxObjectSize(maxObjectSize);
}
}
-----------------------------
CodecFilter.java
public class CodecFilter extends ProtocolCodecFilter {
private final static Logger LOGGER =
LoggerFactory.getLogger(CodecFilter.class);
public CodecFilter(Class<? extends ProtocolEncoder> class1, Class<?
extends ProtocolDecoder> class2) {
super(class1, class2);
}
public CodecFilter(ProtocolEncoder protocolEncoder, ProtocolDecoder
protocolDecoder) {
super(protocolEncoder, protocolDecoder);
}
public CodecFilter(ProtocolCodecFactory protocolCodecFactory) {
super(protocolCodecFactory);
}
@Override
public void filterWrite(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
if (session.getAttribute("state") == SessionState.RECV_FILE ||
session.getAttribute("state") == SessionState.SEND_FILE) {
LOGGER.info("FilterWrite override successful. State: " +
session.getAttribute("state") + " Filter: " +
nextFilter.getClass().toString());
nextFilter.filterWrite(session, writeRequest);
return;
}
else {
LOGGER.info("SerializationCodec FilterWrite in use. State: " +
session.getAttribute("state"));
super.filterWrite(nextFilter, session, writeRequest);
}
}
@Override
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) throws Exception {
if (session.getAttribute("state") == SessionState.RECV_FILE ||
session.getAttribute("state") == SessionState.SEND_FILE) {
LOGGER.info("MessageReceived override successful. State: " +
session.getAttribute("state") + " Filter: " +
nextFilter.getClass().toString());
nextFilter.messageReceived(session, message);
return;
}
else {
LOGGER.info("SerializationCodec MessageReceived in use. State:
" + session.getAttribute("state"));
super.messageReceived(nextFilter, session, message);
}
}
@Override
public void messageSent(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
if (session.getAttribute("state") == SessionState.RECV_FILE ||
session.getAttribute("state") == SessionState.SEND_FILE) {
LOGGER.info("MessageSent override successful. State: " +
session.getAttribute("state") + " Filter: " +
nextFilter.getClass().toString());
nextFilter.messageSent(session, writeRequest);
return;
}
else {
LOGGER.info("SerializationCodec MessageSent in use. State: " +
session.getAttribute("state"));
super.messageSent(nextFilter, session, writeRequest);
}
}
}
------------------------------
CodecEncoder.java
public class CodecEncoder extends ObjectSerializationEncoder {
private final static Logger LOGGER =
LoggerFactory.getLogger(CodecEncoder.class);
private int maxObjectSize = Integer.MAX_VALUE;
public CodecEncoder() {
super();
}
@Override
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception {
if (session.getAttribute("state") == SessionState.RECV_FILE ||
session.getAttribute("state") == SessionState.SEND_FILE) {
LOGGER.info("Encoder override successful");
return;
}
else
LOGGER.info("Serialization Encoder in use. State: " +
session.getAttribute("state"));
super.encode(session, message, out);
}
}
--------------------------
CodecDecoder.java
public class CodecDecoder extends ObjectSerializationDecoder {
private final static Logger LOGGER =
LoggerFactory.getLogger(CodecDecoder.class);
private final ClassLoader classLoader;
private int maxObjectSize = 1048576;
public CodecDecoder() {
this(Thread.currentThread().getContextClassLoader());
}
public CodecDecoder(ClassLoader classLoader) {
if (classLoader == null) {
throw new IllegalArgumentException("classloader");
}
this.classLoader = classLoader;
}
/*
@Override
protected boolean doDecode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
if (session.getAttribute("state") == SessionState.RECV_FILE ||
session.getAttribute("state") == SessionState.SEND_FILE) {
LOGGER.info("doDecoder Override");
out.write(in);
return true;
}
else {
LOGGER.info("Serialization doDecoder in use. State: " +
session.getAttribute("state"));
return super.doDecode(session, in, out);
}
}
*/
@Override
public void decode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
if (session.getAttribute("state") == SessionState.RECV_FILE ||
session.getAttribute("state") == SessionState.SEND_FILE) {
LOGGER.info("Decoder Override");
return;
}
else {
LOGGER.info("Serialization Decoder in use. State: " +
session.getAttribute("state"));
super.decode(session, in, out);
}
}
}