I'm new to MINA and have been working on a small project for a few weeks now.
My application is fairly simple… I'm sending commands to a server via POJO
messages. In some cases the server or the client will send a file instead of a
POJO. I'm using the StreamWriteFilter and the ObjectSerializationCodec
ProtocolCodecFilter. I can send objects back and forth fairly successfully, but
I can't send large files and sometimes even small files fail to be sent.
Here is the exception I get when sending files:
00:46:09] pool-4-thread-4 WARN [] [] [client.ClientIoSessionHandler] -
Exception: class org.apache.mina.filter.codec.ProtocolDecoderException State:
RECV_FILE_WAIT
[00:46:09] pool-4-thread-4 ERROR [] [] [client.ClientIoSessionHandler] -
Throwable:
org.apache.mina.filter.codec.ProtocolDecoderException:
org.apache.mina.core.buffer.BufferDataException: dataLength: 1347093252
Any ideas why am I getting this exception? How should I go about having the
ObjectSerializationCodec handle the POJOs while StreamWriteFilter handles the
files?
And here are the relevant sections of code:
Client.java
package client;
public class Client extends SwingWorker<Void, Void>/*Thread*/ {
private final static Logger LOGGER = LoggerFactory.getLogger(Client.class);
private String hostname = "127.0.0.1";
private int port = 8877;
private String username = null;
private String password = null;
private String computername;
private FileCmdQueue filecmdqueue = null;
private File homedir = null;
private TrayIcon trayicon = null;
private static final long CONNECT_TIMEOUT = 15 *1000L;
private static final int CONNECT_RETRIES = 3;
private Boolean flag = true; //Send a correct hello or a failed
hello on connect
private IoSession session;
public Client(String hostname, int port, String username, String pwd) {
this.hostname = hostname;
this.password = pwd;
this.username = username;
this.computername = null;
if (port < 1024 || port > 65535) {
LOGGER.error("Invalid port number: " + port);
this.port = 8877;
}
else
this.port = port;
try{
computername=InetAddress.getLocalHost().getHostName();
}catch (Exception e){
LOGGER.error("Exception caught = "+ e.getMessage());
computername = "unknown";
}
}
public void Connect(Boolean flag) throws InterruptedException {
if (session == null || !session.isConnected()) {
NioSocketConnector connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(CONNECT_TIMEOUT);
connector.getFilterChain().addLast("executor", new
ExecutorFilter());
connector.getFilterChain().addLast("logger", new LoggingFilter());
CodecFactory cf = new CodecFactory();
cf.setDecoderMaxObjectSize(BUFFER_SIZE);
connector.getFilterChain().addLast("codec", new CodecFilter(cf));
/*
ObjectSerializationCodecFactory oscf = new
ObjectSerializationCodecFactory();
oscf.setDecoderMaxObjectSize(BUFFER_SIZE);
connector.getFilterChain().addLast("codec", new CodecFilter(oscf));
*/
StreamWriteFilter swf = new StreamWriteFilter();
swf.setWriteBufferSize(BUFFER_SIZE);
connector.getFilterChain().addLast("file", swf);
connector.setHandler(new ClientIoSessionHandler(flag, this));
// connector.getSessionConfig().setMaxReadBufferSize(BUFFER_SIZE);
// connector.getSessionConfig().setReadBufferSize(BUFFER_SIZE);
for (int i=0;i< CONNECT_RETRIES ; i++ ) {
try {
ConnectFuture future = connector.connect(new
InetSocketAddress(this.hostname, this.port));
future.awaitUninterruptibly();
session = future.getSession();
break;
} catch (RuntimeIoException e) {
LOGGER.error("Failed to connect.", e);
Thread.sleep(5000);
}
}
if (session != null) {
session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 600);
session.setAttribute(HOME_DIR, homedir);
session.getCloseFuture().awaitUninterruptibly();
}
connector.dispose();
}
}
public void disConnect() {
if (session != null) {
session.close(true);
if (trayicon != null)
trayicon.setToolTip("Status: Disconnected");
}
}
public Boolean sendMessage(FileCmd.CmdType cmd, Object in) {
if (session.getAttribute(SESSION_STATE) == SessionState.IDLE) {
if (cmd == FileCmd.CmdType.LIST) {
session.setAttribute(SESSION_STATE, SessionState.WAITING);
RequestDirectoryTreeMessage myRequest = new
RequestDirectoryTreeMessage();
myRequest.setNonce((Long)session.getAttribute(SESSION_NONCE));
session.write(myRequest);
return true;
}
else if (cmd == FileCmd.CmdType.DOWNLOAD) {
File myfile = (File) in;
DownloadFileMessage myrequest = new DownloadFileMessage();
myrequest.setRequest(myfile);
myrequest.setNonce((Long)session.getAttribute(SESSION_NONCE));
session.setAttribute(CURRENT_FILE, myfile);
session.setAttribute(SESSION_STATE,SessionState.RECV_FILE_WAIT);
session.write(myrequest);
return true;
}
else if (cmd == FileCmd.CmdType.UPLOAD) {
File myfile = (File) in;
session.setAttribute(SESSION_STATE, SessionState.SEND_FILE);
UploadFileMessage myupload = new UploadFileMessage();
myupload.setRequest(myfile);
myupload.setNonce((Long)session.getAttribute(SESSION_NONCE));
session.setAttribute(CURRENT_FILE, myfile);
session.write(myupload);
return true;
}
else if (cmd == FileCmd.CmdType.R_CREATE) {
File myfile = (File) in;
session.setAttribute(SESSION_STATE, SessionState.WAITING);
CreateDirectoryMessage myrequest = new CreateDirectoryMessage();
myrequest.setRequest(myfile);
myrequest.setNonce((Long)session.getAttribute(SESSION_NONCE));
session.setAttribute(CURRENT_FILE, myfile);
session.write(myrequest);
return true;
}
else if (cmd == FileCmd.CmdType.DELETE) {
File myfile = (File) in;
session.setAttribute(SESSION_STATE, SessionState.WAITING);
DeleteFileMessage myrequest = new DeleteFileMessage();
myrequest.setRequest(myfile);
myrequest.setNonce((Long)session.getAttribute(SESSION_NONCE));
session.setAttribute(CURRENT_FILE, myfile);
session.write(myrequest);
return true;
}
else
return false;
} else
return false;
}
public FileDirTree getTreeMap() {
if (session.containsAttribute(FILEDIR_TREE))
return (FileDirTree) session.getAttribute(FILEDIR_TREE);
else
return null;
}
public void setTreeMap(FileDirTree intree) {
if (session.containsAttribute(FILEDIR_TREE) && intree != null)
session.setAttribute(FILEDIR_TREE, intree);
else
session.removeAttribute(FILEDIR_TREE);
}
// @Override
protected Void doInBackground() {
try {
//TODO: Thread locked... need to move to multi-threaded model
this.Connect(flag);
return null;
} catch (InterruptedException f) {
LOGGER.error("Connection Interrupted", f);
return null;
}
}
}
-----------------------------------------------------------------
ClientIoSessionHandler.java
public class ClientIoSessionHandler extends IoHandlerAdapter {
private final Client parent;
private Boolean flag = true; //Sends a correct announcement or not
private final static Logger LOGGER =
LoggerFactory.getLogger(ClientIoSessionHandler.class);
public ClientIoSessionHandler(Boolean flag, Client parent) {
this.flag = flag;
this.parent = parent;
}
@Override
public void sessionOpened(IoSession session) {
// ClientAnnounceMessage msg = new ClientAnnounceMessage(65530,
compname.length(), compname, (String)session.getAttribute(USERNAME),
(String)session.getAttribute(PASSWORD));
String method = "sessionOpened";
session.setAttribute(MSG_COUNTER, 0L);
ClientAnnounceMessage msg =
new ClientAnnounceMessage(65530, parent.getComputername().length(),
parent.getComputername(),
parent.getUsername(), "test123");
session.setAttribute(SESSION_STATE, SessionState.WAIT_INIT);
session.setAttribute(USERNAME, parent.getHostname());
// session.setAttribute(PASSWORD, parent.getPassword());
session.setAttribute(PASSWORD, "test123");
session.setAttribute(CLIENT_CODE, 65530);
session.setAttribute(CLIENT_NAME, parent.getHostname());
if (parent.getTrayicon() != null)
parent.getTrayicon().setToolTip("Status: Connected");
if (flag)
session.write(msg);
else
session.write("Hello There");
LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + "
Method: " + method + " State: " + session.getAttribute(SESSION_STATE));
}
@Override
public void messageReceived(IoSession session, Object message) {
String method = "messageReceived";
if (session.getAttribute(SESSION_STATE) == SessionState.WAIT_INIT) {
if (message instanceof ServerStdResponseMessage) {
loggerHelper(session, method, message);
ServerStdResponseMessage myMsg =
(ServerStdResponseMessage)message;
if (myMsg.getResult() == 1) {
session.setAttribute(SESSION_NONCE, myMsg.getNonce());
session.setAttribute(SESSION_STATE, SessionState.IDLE);
LOGGER.info("Connection with the server established.
Result: " + myMsg.getResult());
for (int i = 0; i < 5; i++) {
if (parent.sendMessage(FileCmd.CmdType.LIST, null)) {
break;
}
}
} else {
loggerHelper(session, method, message);
LOGGER.error("Couldn't initialize connection with server.
Error: " + myMsg.getResult());
session.close(true);
}
}
} else if (session.getAttribute(SESSION_STATE) == SessionState.WAITING)
{
if (message instanceof FileDirTree) {
loggerHelper(session, method, message);
FileDirTree myTree = (FileDirTree)message;
session.setAttribute(FILEDIR_TREE, myTree);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
LOGGER.info("Received Directory Tree");
} else if (message instanceof ServerStdResponseMessage) {
loggerHelper(session, method, message);
ServerStdResponseMessage myMsg =
(ServerStdResponseMessage)message;
session.setAttribute(SESSION_NONCE, myMsg.getNonce());
session.setAttribute(SESSION_STATE, SessionState.IDLE);
if (myMsg.getResult() == 1)
LOGGER.info("We got an OK response from the server. Result:
" + myMsg.getResult());
else
loggerHelper(session, method, message);
LOGGER.warn("Not the response we were expecting. Result: "
+ myMsg.getResult());
}
} else if (session.getAttribute(SESSION_STATE) ==
SessionState.RECV_FILE) {
File myfile = (File)session.getAttribute(CURRENT_FILE);
if (message instanceof IoBuffer) {
// if (message instanceof FileInputStream) {
loggerHelper(session, method, message);
IoBuffer fbuff = (IoBuffer)message;
InputStream frecv = fbuff.asInputStream();
FileOutputStream fout;
try {
if (session.containsAttribute(FILE_OUT))
fout = (FileOutputStream)session.getAttribute(FILE_OUT);
else
fout = new FileOutputStream(new
File(((File)session.getAttribute(HOME_DIR)).getPath().concat(myfile.getPath())),
false);
int i = 0;
while ((i = frecv.read()) != -1)
fout.write(i);
frecv.close();
fout.close();
} catch (FileNotFoundException e) {
LOGGER.error("Couldn't write to file: " + myfile.getName()
+ " Exception: " + e.getMessage());
session.removeAttribute(CURRENT_FILE);
session.removeAttribute(FILE_OUT);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
} catch (IOException e) {
LOGGER.error("Couldn't write to file: " + myfile.getName()
+ " Exception: " + e.getMessage());
session.removeAttribute(CURRENT_FILE);
session.removeAttribute(FILE_OUT);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
}
}
else if (message instanceof ServerStdResponseMessage) {
loggerHelper(session, method, message);
ServerStdResponseMessage myMsg =
(ServerStdResponseMessage)message;
session.setAttribute(SESSION_NONCE, myMsg.getNonce());
LOGGER.error("Expecting file: " + myfile.getPath() +
" got standard response message instead - Result:
" + myMsg.getResult());
session.removeAttribute(FILE_OUT);
session.removeAttribute(CURRENT_FILE);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
} else {
if (session.containsAttribute(FILE_OUT)) {
loggerHelper(session, method, message);
FileOutputStream fout;
fout = (FileOutputStream)session.getAttribute(FILE_OUT);
try {
fout.close();
} catch (IOException e) {
LOGGER.error("Couldn't close file: " +
session.getAttribute(CURRENT_FILE).toString());
}
session.removeAttribute(CURRENT_FILE);
session.removeAttribute(FILE_OUT);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
LOGGER.info("Didn't get an IOBuffer, so we are assuming end
of transmission State: " +
session.getAttribute(SESSION_STATE));
}
else {
loggerHelper(session, method, message);
LOGGER.error("Didn't receive the actual file or response
message " + myfile.getName() + " Message: " +
message.getClass().toString());
session.removeAttribute(CURRENT_FILE);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
}
}
} else if (session.getAttribute(SESSION_STATE) ==
SessionState.SEND_FILE) {
if (message instanceof ServerStdResponseMessage) {
loggerHelper(session, method, message);
ServerStdResponseMessage myMsg =
(ServerStdResponseMessage)message;
session.setAttribute(SESSION_NONCE, myMsg.getNonce());
if (myMsg.getResult() == 1) {
FileInputStream fsend = null;
try {
fsend = new FileInputStream(new File(testfile2));
session.setAttribute(FILE_STREAM, fsend);
session.write(fsend);
} catch (FileNotFoundException e) {
session.removeAttribute(CURRENT_FILE);
session.setAttribute(SESSION_STATE, SessionState.IDLE);
}
} else {
session.removeAttribute("current_file");
session.setAttribute(SESSION_STATE, SessionState.IDLE);
LOGGER.error("Received a negative upload response from
server. Result: " + myMsg.getResult());
}
}
else {
loggerHelper(session, method, message);
}
} else if (session.getAttribute(SESSION_STATE) == SessionState.IDLE) {
if (message instanceof ServerStdResponseMessage) {
loggerHelper(session, method, message);
ServerStdResponseMessage myMsg =
(ServerStdResponseMessage)message;
session.setAttribute(SESSION_NONCE, myMsg.getNonce());
session.setAttribute(SESSION_STATE, SessionState.IDLE);
LOGGER.error("Received response without a request. Result: " +
myMsg.getResult());
}
else {
loggerHelper(session, method, message);
}
}
//This should never happen
else {
loggerHelper(session, method, message);
}
}
@Override
public void messageSent(IoSession session, Object message) {
String method = "messageSent";
//Streams are not closed by the file filter automatically, so we must
close them here
if (session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE &&
session.containsAttribute(FILE_STREAM)) {
loggerHelper(session, method, message);
FileInputStream fsend;
fsend = (FileInputStream)session.getAttribute(FILE_STREAM);
try {
fsend.close();
session.setAttribute(SESSION_STATE, SessionState.IDLE);
} catch (IOException e) {
LOGGER.error("Unable to close stream: " + e.getMessage());
session.setAttribute(SESSION_STATE, SessionState.IDLE);
}
} else if (session.getAttribute(SESSION_STATE) ==
SessionState.RECV_FILE_WAIT) {
loggerHelper(session, method, message);
session.setAttribute(SESSION_STATE, SessionState.RECV_FILE);
}
else {
loggerHelper(session, method, message);
}
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
LOGGER.info("Session idle... disconnecting.");
if (parent.getTrayicon() != null)
parent.getTrayicon().setToolTip("Status: Disconnected");
session.close(true);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
//If this is a NotSerializableException we should continue and hope for
the best ;-)
if (cause.getClass() == ProtocolDecoderException.class ||
cause.getClass() == ProtocolEncoderException.class) {
LOGGER.warn("Exception: " + cause.getClass().toString() + " State:
" +
session.getAttribute(SESSION_STATE));
LOGGER.error("Throwable: ", cause);
}
else {
//Let's close the connection since we can't recover
// LOGGER.error("Unrecoverable Exception on Session: " +
session.getAttribute(CLIENT_NAME), cause);
LOGGER.error("Unrecoverable Exception on Session: " +
session.getAttribute(CLIENT_NAME) + " Exception: " +
cause.getClass().toString());
LOGGER.error("Stack: " + cause.getMessage());
LOGGER.error("Throwable", cause);
if (parent.getTrayicon() != null)
parent.getTrayicon().setToolTip("Status: Disconnected");
session.close(true);
}
}
private void loggerHelper(IoSession session, String method, Object message)
{
session.setAttribute(MSG_COUNTER,
(Long)session.getAttribute(MSG_COUNTER) + 1);
LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER)
+ " Method: " + method + " State: " + session.getAttribute(SESSION_STATE) + "
Message Class: " + message.getClass().toString());
}
}
--------------------------------------------------------
Server.java
public class Server {
private final static Logger LOGGER = LoggerFactory.getLogger(Server.class);
private final static UserStore userstore = new UserStore();
public Server() {
super();
}
public static void main(String[] args) throws Throwable {
//Default Port and Address
int SERVER_PORT = 8877;
String SERVER_ADDRESS = "127.0.0.1";
NioSocketAcceptor acceptor = new NioSocketAcceptor();
//Let's keep events from firing at the wrong time or simultaneously
acceptor.getFilterChain().addLast("executor", new ExecutorFilter());
//We want to instert a logging filter
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
CodecFactory cf = new CodecFactory();
cf.setDecoderMaxObjectSize(BUFFER_SIZE);
acceptor.getFilterChain().addLast("codec", new CodecFilter(cf));
StreamWriteFilter swf = new StreamWriteFilter();
swf.setWriteBufferSize(BUFFER_SIZE);
acceptor.getFilterChain().addLast("file", swf);
acceptor.setHandler(new ServerIoSessionHandler());
// acceptor.getSessionConfig().setMaxReadBufferSize(BUFFER_SIZE);
// acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE);
//Did we get non-default host and port arguments
if (!(args.length < 2 || args.length > 2)) {
//Assign the port number
try {
SERVER_PORT = Integer.parseInt(args[1]);
} catch (NumberFormatException e) {
LOGGER.error(e.getMessage());
}
//Only allow ports in the user range
if (SERVER_PORT < 1024 || SERVER_PORT > 65535) {
LOGGER.error("Invalid port number: " + SERVER_PORT);
System.exit(1);
}
//TODO: Validate the host address
SERVER_ADDRESS = args[0];
}
//Let's bind to our address and port number. Give it 5 tries...
for (int i=0; i<5; i++)
try {
acceptor.bind(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT));
break;
} catch (IOException ioe) {
// TODO: Add catch code
LOGGER.error("Server was unable to bind to port: " + SERVER_PORT +
" Error: " + ioe.getMessage());
Thread.sleep(1000);
}
if (acceptor.isActive())
//We are done
LOGGER.info("Server started on address and port: " + SERVER_ADDRESS
+ ":" + SERVER_PORT);
else {
LOGGER.info("Server could not start");
System.exit(1);
}
}
}
----------------------------------------------------------
ServerIoSessionHandler.java
public class ServerIoSessionHandler extends IoHandlerAdapter {
private final static Logger LOGGER =
LoggerFactory.getLogger(ServerIoSessionHandler.class);
@Override
public void sessionOpened(IoSession session) {
String method = "sessionOpened";
session.setAttribute(MSG_COUNTER, 0L);
//Set our idle time to 600 seconds
session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 600);
//Our local directory tree
//TODO: This should be getting loaded from storage and not created on
connect
FileDirTree localdt = new FileDirTree();
session.setAttribute(HOME_DIR, "/users/admin/Downloads");
try {
localdt.setRootDir(new
File(session.getAttribute(HOME_DIR).toString()));
session.setAttribute(FILEDIR_TREE, localdt);
} catch (InvalidParameterException e) {
LOGGER.error("Invalid File() - Closing Session", e);
session.close(true);
} catch (InvalidObjectException e) {
LOGGER.error("Invalid FileDirTree.setRoot() - Closing Session",
e);
session.close(true);
}
//Let's keep a session nonce
session.setAttribute(SESSION_NONCE, generateNonce());
//Initialize our state
session.setAttribute(SESSION_STATE, SessionState.WAIT_INIT);
LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + "
Method: " + method + " State: " + session.getAttribute(SESSION_STATE));
}
@Override
public void messageReceived(IoSession session, Object message) {
String method = "messageReceived";
//TODO: Add busy state with appropriate client response
if (session.getAttribute(SESSION_STATE) == SessionState.WAIT_INIT) {
if (message instanceof ClientAnnounceMessage) {
loggerHelper(session, method, message);
ClientAnnounceMessage myClient = (ClientAnnounceMessage)
message;
session.setAttribute(CLIENT_CODE, myClient.getClient_codes());
session.setAttribute(CLIENT_NAME, myClient.getClient_name());
session.setAttribute(USERNAME, myClient.getUsername());
if (true) {
sendStdResponseMessage(session, true, null);
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
else {
LOGGER.info("Unable to authenticate user: " +
session.getAttribute(USERNAME));
session.close(true);
}
}
else {
loggerHelper(session, method, message);
sendStdResponseMessage(session, false, "Expected
ClientAnnounceMessage, got: " + message.getClass().toString());
}
}
else if (session.getAttribute(SESSION_STATE) == SessionState.WAIT_CMD) {
if (message instanceof RequestDirectoryTreeMessage) {
loggerHelper(session, method, message);
session.write(session.getAttribute(FILEDIR_TREE));
}
else if (message instanceof DownloadFileMessage) {
loggerHelper(session, method, message);
session.setAttribute(SESSION_STATE, SessionState.SEND_FILE);
DownloadFileMessage filerequest = (DownloadFileMessage) message;
FileDirTree localdt = (FileDirTree)
session.getAttribute(FILEDIR_TREE);
if (localdt.getLocalFile(filerequest.getRequest()) != null) {
FileInputStream fsend = null;
try {
fsend = new
FileInputStream(localdt.getLocalFile(filerequest.getRequest()));
session.setAttribute(FILE_STREAM, fsend);
// WriteFuture future = session.write(fsend);
// future.awaitUninterruptibly();
session.write(fsend);
} catch (FileNotFoundException e) {
sendStdResponseMessage(session, false, "Unable to send
file: " + filerequest.getRequest().getName()
+ "\nException: " +
e.getMessage());
session.setAttribute(SESSION_STATE,
SessionState.WAIT_CMD);
}
}
else {
sendStdResponseMessage(session, false, "File not found in
local tree");
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
}
else if (message instanceof UploadFileMessage) {
loggerHelper(session, method, message);
UploadFileMessage uploadfile = (UploadFileMessage) message;
if (uploadfile.getRequest() != null && uploadfile.getRequest()
instanceof File) {
session.setAttribute(CURRENT_FILE, uploadfile.getRequest());
session.setAttribute(SESSION_STATE, SessionState.RECV_FILE);
sendStdResponseMessage(session, true, null);
else {
sendStdResponseMessage(session, false, "Invalid file upload
message: " + uploadfile.toString());
}
}
else if (message instanceof CreateDirectoryMessage) {
loggerHelper(session, method, message);
CreateDirectoryMessage createdir = (CreateDirectoryMessage)
message;
if (createdir.getRequest() != null && createdir.getRequest()
instanceof File) {
session.setAttribute(CURRENT_FILE, createdir.getRequest());
session.setAttribute(SESSION_STATE, SessionState.BUSY);
File f = (File) createdir.getRequest();
if (f.exists())
sendStdResponseMessage(session, false, "File already
exists: " + createdir.toString());
else if (!f.mkdirs()) {
sendStdResponseMessage(session, false, "Unable to
create directory: " + createdir.toString());
}
else {
sendStdResponseMessage(session, true, "Created
directory: " + createdir.toString());
}
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
else {
sendStdResponseMessage(session, false, "Invalid request
payload: " + createdir.toString());
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
}
else if (message instanceof DeleteFileMessage) {
loggerHelper(session, method, message);
DeleteFileMessage deletefile = (DeleteFileMessage) message;
if (deletefile.getRequest() != null && deletefile.getRequest()
instanceof File) {
session.setAttribute(CURRENT_FILE, deletefile.getRequest());
session.setAttribute(SESSION_STATE, SessionState.BUSY);
File f = (File) deletefile.getRequest();
if (!f.exists())
sendStdResponseMessage(session, false, "File doesn't
exist: " + deletefile.toString());
else if (f.isDirectory() && f.listFiles() != null) {
sendStdResponseMessage(session, false, "Directory is
not empty: " + deletefile.toString());
}
else if (f.delete()) {
sendStdResponseMessage(session, true, "Deleted: " +
deletefile.toString());
}
else
sendStdResponseMessage(session, false, "Could not
delete: " + deletefile.toString());
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
else {
sendStdResponseMessage(session, false, "Invalid request
payload: " + deletefile.toString());
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
}
else {
loggerHelper(session, method, message);
sendStdResponseMessage(session, false, "Command not recognized.
CMD: " + message.getClass().toString());
}
} else if (session.getAttribute(SESSION_STATE) ==
SessionState.RECV_FILE) {
File myfile = (File)session.getAttribute(CURRENT_FILE);
if (message instanceof IoBuffer) {
loggerHelper(session, method, message);
IoBuffer fbuff = (IoBuffer)message;
InputStream frecv = fbuff.asInputStream();
FileOutputStream fout;
try {
fout = new FileOutputStream(new
File("/users/admin/filetest-srv.txt"), false);
int i = 0;
while ((i = frecv.read()) != -1)
fout.write(i);
frecv.close();
fout.close();
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
} catch (FileNotFoundException e) {
LOGGER.error("Couldn't write to file: " + myfile.getName()
+ " Exception: " + e.getMessage());
session.removeAttribute(CURRENT_FILE);
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
} catch (IOException e) {
LOGGER.error("Couldn't write to file: " + myfile.getName()
+ " Exception: " + e.getMessage());
session.removeAttribute(CURRENT_FILE);
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
} else {
loggerHelper(session, method, message);
LOGGER.error("Didn't receive the actual file or response
message " + myfile.getName() + " Message: " +
message.getClass().toString());
session.removeAttribute(CURRENT_FILE);
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
}
//This should never happen
else {
loggerHelper(session, method, message);
sendStdResponseMessage(session, false, "Invalid session state: " +
session.getAttribute(SESSION_STATE).toString());
}
}
@Override
public void messageSent(IoSession session, Object message) {
String method = "messageSent";
//Streams are not closed by the file filter automatically, so we must
close them here
if(session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE &&
session.containsAttribute(FILE_STREAM)) {
loggerHelper(session, method, message);
FileInputStream fsend;
fsend = (FileInputStream) session.getAttribute(FILE_STREAM);
try {
fsend.close();
LOGGER.info("Closed Stream");
} catch (IOException e) {
LOGGER.error("Unable to close stream: " + e.getMessage());
}
session.removeAttribute(FILE_STREAM);
session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
}
else {
loggerHelper(session, method, message);
}
}
//Generate a nonce
private long generateNonce() {
SecureRandom rnd = new SecureRandom();
return rnd.nextLong();
}
//Have we been idel beyond our timeout? Then disconnect
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
LOGGER.info("Session idle... disconnecting.");
session.close(true);
}
//Log any unrecoverable exceptions and close the connection
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
//If this is a NotSerializableException we should continue and hope for
the best ;-)
if (cause.getClass() == ProtocolDecoderException.class ||
cause.getClass() == ProtocolEncoderException.class)
LOGGER.warn("Exception: " + cause.getClass().toString() + " State:
" + session.getAttribute(SESSION_STATE));
else {
//Let's close the connection since we can't recover
// LOGGER.error("Unrecoverable Exception on Session: " +
session.getAttribute(CLIENT_NAME), cause);
LOGGER.error("Unrecoverable Exception on Session: " +
session.getAttribute(CLIENT_NAME) + " Exception: " +
cause.getClass().toString());
session.close(true);
}
}
//Helper method to send standard server response messages
private void sendStdResponseMessage(IoSession session, Boolean flag, String
msg) {
if (session != null) {
ServerStdResponseMessage ssrm;
ssrm = new ServerStdResponseMessage();
ssrm.setNonce((Long)session.getAttribute(SESSION_NONCE) + 1);
if (flag)
ssrm.setResult((short) 1);
else
ssrm.setResult((short) 0);
session.write(ssrm);
}
if (msg != null)
LOGGER.error(msg);
}
private void loggerHelper(IoSession session, String method, Object message)
{
session.setAttribute(MSG_COUNTER,
(Long)session.getAttribute(MSG_COUNTER) + 1);
LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER)
+ " Method: " + method + " State: " + session.getAttribute(SESSION_STATE) + "
Message Class: " + message.getClass().toString());
}
}
--------------------------------------------------
CodecFactory.java
public class CodecFactory extends ObjectSerializationCodecFactory {
private final CodecEncoder encoder;
private final CodecDecoder decoder;
public CodecFactory() {
this(Thread.currentThread().getContextClassLoader());
}
public CodecFactory(ClassLoader classLoader) {
encoder = new CodecEncoder();
decoder = new CodecDecoder(classLoader);
}
@Override
public ProtocolEncoder getEncoder(IoSession session) {
return encoder;
}
@Override
public ProtocolDecoder getDecoder(IoSession session) {
return decoder;
}
@Override
public int getEncoderMaxObjectSize() {
return encoder.getMaxObjectSize();
}
@Override
public void setEncoderMaxObjectSize(int maxObjectSize) {
encoder.setMaxObjectSize(maxObjectSize);
}
@Override
public int getDecoderMaxObjectSize() {
return decoder.getMaxObjectSize();
}
@Override
public void setDecoderMaxObjectSize(int maxObjectSize) {
decoder.setMaxObjectSize(maxObjectSize);
}
}
-----------------------------------
CodecFilter.java
public class CodecFilter extends ProtocolCodecFilter {
public CodecFilter(Class<? extends ProtocolEncoder> class1, Class<? extends
ProtocolDecoder> class2) {
super(class1, class2);
}
public CodecFilter(ProtocolEncoder protocolEncoder, ProtocolDecoder
protocolDecoder) {
super(protocolEncoder, protocolDecoder);
}
public CodecFilter(ProtocolCodecFactory protocolCodecFactory) {
super(protocolCodecFactory);
}
@Override
public void filterWrite(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
if (writeRequest.getMessage() instanceof FileInputStream)
// if (session.getAttribute("state") == SessionState.RECV_FILE ||
session.getAttribute("state") == SessionState.SEND_FILE)
nextFilter.filterWrite(session, writeRequest);
else
super.filterWrite(nextFilter, session, writeRequest);
}
}
---------------------------------
CodecDecoder.java
public class CodecDecoder extends ObjectSerializationDecoder {
private final ClassLoader classLoader;
private int maxObjectSize = 1048576;
public CodecDecoder() {
this(Thread.currentThread().getContextClassLoader());
}
public CodecDecoder(ClassLoader classLoader) {
if (classLoader == null) {
throw new IllegalArgumentException("classloader");
}
this.classLoader = classLoader;
}
@Override
protected boolean doDecode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
if (session.getAttribute("state") == SessionState.RECV_FILE ||
session.getAttribute("state") == SessionState.SEND_FILE)
return false;
else {
return super.doDecode(session, in, out);
}
}
}
----------------------------------
CodecEncoder.java
public class CodecEncoder extends ObjectSerializationEncoder {
private int maxObjectSize = Integer.MAX_VALUE;
public CodecEncoder() {
super();
}
@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput
out) throws Exception {
if (session.getAttribute("state") == SessionState.RECV_FILE ||
session.getAttribute("state") == SessionState.SEND_FILE)
return;
else
super.encode(session, message, out);
}
}