Hi all,

I am using Akka to create a chat application.

I start the TCP socket using StartTcpConnectionActor and after a new 
connnetion is established I start the TcpConnectionActor for each 
connnection.

My android client, uses the Socket class to connect to the server and 
everything works ok.

But I have some problems to finish the connection.

In my client, if I call socket.close() , I receive Tcp.PeerClosed on my 
server, so I can finish the connection.

But if the user turn Off the Wifi on his device, the server didn´t receive 
any error. 

The onReceive() method on my TcpConnectionActor is not even called.. So in 
my server, Akka doesnt know that my user is offline without any Wifi 
connection...

Is this the expected behavior? If the device turns the Wifi Off, it should 
break the socket connection, or not?

thank you

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
package br.livetouch.livecom.chatAkka.actor;

import java.net.InetSocketAddress;

import org.apache.log4j.Logger;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.io.Tcp;
import akka.io.TcpMessage;
import br.infra.util.Log;

public class StartTcpConnectionActor extends UntypedActor {

	private static Logger log = Log.getLogger("TcpConnectionActor");

	public static class Start {
		public int port;

		public Start(int port) {
			this.port = port;
		}
	}

	@Override
	public void onReceive(Object msg) throws Exception {
		if (msg instanceof Start) {
			// Faz start do AKKA server
			log.debug("Akka Server - Bind to socket:" + ((Start) msg).port);
			final ActorRef tcp = Tcp.get(getContext().system()).manager();

			tcp.tell(TcpMessage.bind(getSelf(), new InetSocketAddress(((Start) msg).port), 100), getSelf());

		} else if (msg instanceof Tcp.Connected) {
			// Cliente conectou o socket
			log.error("Akka Server - Ator conectado: " + getSender());
			
			// Cria o ator para leitura TCP do Socket
			ActorRef tcpUserActor = getContext().actorOf(Props.create(TcpConnectionActor.class, getSender()));

			ActorRef connectionActor = getSender();
			connectionActor.tell(TcpMessage.register(tcpUserActor), getSelf());
		} else if (msg instanceof Tcp.CommandFailed) {
			log.error("Akka Server - Error to bind to port");
			getContext().stop(self());
		} else {
			log.error("StartTcpConnectionActor ERROR! " + msg);
			System.err.println("StartTcpConnectionActor ERROR! " + msg);
		}
	}
}
package br.livetouch.livecom.chatAkka.actor;

import java.io.UnsupportedEncodingException;

import org.apache.log4j.Logger;

import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.UntypedActor;
import akka.io.Tcp;
import akka.io.Tcp.Received;
import akka.io.Tcp.Write;
import akka.util.ByteString;
import br.infra.util.Log;
import br.livetouch.livecom.chatAkka.actor.UserActor.Stop;
import br.livetouch.livecom.chatAkka.protocol.AkkaJson;
import br.livetouch.livecom.chatAkka.protocol.JsonRawMessage;
import br.livetouch.livecom.chatAkka.protocol.RawMessage;

/**
 * Ator para escrever no socket.
 * 
 * Faz o loop de leitura das mensagens e envia pro UserMessageRouter
 * 
 * @author rlech
 *
 */
public class TcpConnectionActor extends UntypedActor {

	private static Logger log = Log.getLogger("TcpConnectionActor");

	public enum State {
		Idle, StartReadHeader, StartReadMsg;
	}

	private boolean continueToReadSplitMessage = false;
	private ByteString msgPartsContinue = null;
	private int sizeContinue = 0;
	private int totalBytesReaded = 0;

	public ActorSelection userMessageRouter;
	/**
	 * UserActor criado quando fez a conexão TCP.
	 * É o UserActor que contém várias conexões.
	 */
	public ActorRef userActorWhenLogged;
	public Long userId;
	public String userSo;

	/**
	 * Ator da conexão. Contém o socket para escrever.
	 */
	public ActorRef tcpConnection;
	private int nextPercent;

	/**
	 * Construtor chamado por reflexão
	 * 
	 * @param connection
	 */
	public TcpConnectionActor(ActorRef connection) {
		this.tcpConnection = connection;
	}

	@Override
	public void preStart() throws Exception {
		userMessageRouter = AkkaFactory.getUserMessageRouter(getContext());
		super.preStart();
	}

	@Override
	public void onReceive(Object msg) throws Exception {
		try {
			if (msg instanceof Received) {

				ByteString data = ((Received) msg).data();
				receiveData(data);
				return;
				
			} else if (msg instanceof Stop) {
				log("TcpUserActor Stop");

				getContext().stop(getSelf());
			} else if (msg instanceof Tcp.PeerClosed$) {
				// Fechou socket com socket.close
				log("TcpUserActor PeerClosed");
				stop();
			} else if (msg instanceof Tcp.ErrorClosed) {
				log("TcpUserActor ErrorClosed");
				stop();
			} else if (msg instanceof Tcp.Closed$) {
				log("TcpUserActor Closed");
				stop();
			} else if (msg instanceof Tcp.ConfirmedClosed$) {
				log("TcpUserActor ConfirmedClosed");
				stop();
			} else if (msg instanceof Tcp.Aborted$) {
				log("TcpUserActor Aborted");
				stop();
			} else if (msg instanceof Write) {
//				
				/**
				 * Escreve os dados enviados no TCP Socket
				 */
				tcpConnection.tell(msg, getSelf());
			} else if (msg instanceof AssociateWithUser) {
				userActorWhenLogged = ((AssociateWithUser) msg).userActor;
				userId = ((AssociateWithUser) msg).userId;
				userSo = ((AssociateWithUser) msg).userSo;
				log("TcpUserActor Associando conexão com usuário ["+userId+"/"+userSo+"]: " + userActorWhenLogged);
			} else {
				System.err.println("TcpUserActor else: " + msg);
				log("TcpUserActor else: " + msg);
			}
		} catch(Exception ex) {
			ex.printStackTrace();
			log.error("TcpConnectionActor.onReceive: " + ex.getMessage(), ex);
		}
	}

	private void receiveData(ByteString dataReceived) {
		
		int lengghtBytesReceived = 0; 
		try {
			byte[] bytes = dataReceived.toArray();
			lengghtBytesReceived = bytes.length;
			String sreceived = new String(bytes,"UTF-8");
			System.err.println(">> TcpConnectionActor.receiveData ["+bytes.length+"]: " + sreceived);
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		
		boolean reading = true;
		ByteString msgParts = ByteString.empty();
		State state = State.Idle;
		int size = 0;

		do {
			if (state == State.Idle) {
				if (dataReceived.length() > 0) {
					state = State.StartReadHeader;
					if (continueToReadSplitMessage) {
						continueToReadSplitMessage = false;
						msgParts = msgPartsContinue;
						size = sizeContinue;
						state = State.StartReadMsg;
					}
				} else {
					reading = false;
				}
			} else if (state == State.StartReadHeader) {
				ByteString firstPart = dataReceived.slice(0, 4 - msgParts.length());
				msgParts = msgParts.concat(firstPart);
				if (msgParts.length() >= 4) {
					state = State.StartReadMsg;
					size = (int) RawMessage.headerBytesToSize(msgParts.slice(0, 4).toArray());
					dataReceived = dataReceived.slice(4, dataReceived.length());
					log("TcpUserActor - StartReadHeader size " + size);
				} else {
					reading = false;
				}
			} else if (state == State.StartReadMsg) {
				log("TcpUserActor - StartReadMsg");

				ByteString contents = dataReceived.slice(0, size - (msgParts.length() - 4));
				msgParts = msgParts.concat(contents);

				dataReceived = dataReceived.slice(msgParts.length() - 4, dataReceived.length());
				if (msgParts.length() >= 4 + size) {
					log("TcpUserActor - StartReadMsg FSM:Frame Reached");
					ByteString frame = msgParts.slice(0, size + 4);

					/**
					 * FIM LEITURA
					 */
					if(nextPercent > 80) {
						sendProgressUploadFile(lengghtBytesReceived, size);
					}

					// Envia o arquivo lido pro Router
					sendMsgToRouter(frame);

					// Zera dados para proxima leitura
					size = 0;
					msgParts = ByteString.empty();
					totalBytesReaded = 0;
					nextPercent = 0;
					continueToReadSplitMessage = false;
					state = State.Idle;
					continue;
				} else {

					sendProgressUploadFile(lengghtBytesReceived, size);

					// Continua a leitura do arquivo
					continueToReadSplitMessage = true;
					msgPartsContinue = msgParts;
					sizeContinue = size;
				}
				reading = false;
			}
		} while (reading);
	}

	private void sendProgressUploadFile(int lengghtBytesReceived, int size) {
		totalBytesReaded += lengghtBytesReceived;
		int percent = totalBytesReaded * 100 / size;
		
		if(percent >= nextPercent) {
			log("TcpUserActor - total size readed: " + percent + "%");
			sendPercentToRouter(percent);
			
			nextPercent += 20;
		}
	}

	private void sendPercentToRouter(int percent) {
		if(userActorWhenLogged != null) {
			percent = percent > 100? 100: percent;
			JsonRawMessage raw = AkkaJson.getJsonProgress(percent);
			userActorWhenLogged.tell(raw, getSelf());
		}
	}

	/**
	 * Depois de le a mensagem, envia pro proximo ator.
	 * @param frame
	 */
	private void sendMsgToRouter(ByteString frame) {
		byte[] array = frame.toArray();
		RawMessage message = new RawMessage(array);

		String json = message.getText();
		log("<< TcpConnectionActor.sendMsgToRouter json: " + json);
		log("------");

		JsonRawMessage jsonRawMsg = new JsonRawMessage(getSelf(), message);
		userMessageRouter.tell(jsonRawMsg, getSelf());
	}
	
	/**
	 * Depois de le a mensagem, envia pro proximo ator.
	 * @param frame
	 */
	private void sendToastToRouter(String toast) {
//		Toast t = new Toast(getSelf(), toast);
//		userMessageRouter.tell(t, getSelf());//ok
		
		JsonRawMessage raw = AkkaJson.getJsonToast(toast);
//		tcpConnection.tell(TcpMessage.write(raw.getByteString()), getSelf());//ok
//		tcpConnection.tell(TcpMessage.write(raw.getByteString()), getSender());//ok
		
//		AkkaHelper.tellTcpJson(getSender(),getSender(), raw);//ok
		
//		ByteString byteString = raw.getByteString();
//		getSender().tell(TcpMessage.write(byteString), getSelf());
		
		userActorWhenLogged.tell(raw, getSelf());
	}

	public void stop() {
		log("TcpConnectionActor.stop()");

		if (userActorWhenLogged != null) {
			userActorWhenLogged.tell(new UserActor.RemoveConnection(getSelf()), getSelf());
		}

		getContext().stop(getSelf());
	}

	private void log(String string) {
		int max = 200;
		if(string.length() > max) {
			string = string.substring(0,max);
		}
		if(userActorWhenLogged != null) {
			log.debug("("+userId+"/"+userSo+"): " + string);
		}
		else {
			log.debug(string);
		}
	}

}

Reply via email to