Thanks for that.Yes,Streams might be right.
But I wanted to know if my current implementation is fine or can I tweak it 
further to improve based on few questions that I mentioned ?

On Wednesday, November 23, 2016 at 4:24:42 PM UTC+5:30, √ wrote:
>
> Your use-case sounds like a perfect example of something which would 
> benefit quite a bit from being based on Akka Streams.
>
> On Wed, Nov 23, 2016 at 11:25 AM, kk k <cracke...@gmail.com <javascript:>> 
> wrote:
>
>>
>> This is my first program in akka so I wanted to know if the below program 
>> is efficient and is using the advantages of actor model.
>>
>> ----------
>>
>> The program's purpose is to scan a given directory for any files and 
>> print the number of lines in each file.
>>
>> 1. The main `Application` class will create the actor system and send a 
>> Scan message to a `FileScanner` actor.
>> 2. The `FileScanner` actor will scan the given directory, and for each 
>> file it will create a new `FileParser` actor and send a Parse message. 
>> Also, all the fileparser actors are passed the same Aggregator actor 
>> Reference.
>> 3. The `FileParser` actor will parse the given file, and for each line it 
>> will send a Line message to the Aggregator Actor.
>> 4. The `Aggregator` actor will maintain a count of the number of lines 
>> for each file in an instance hashmap and will print the line count for each 
>> "End" message it receives. Once all files are processed, it will shutdown 
>> the actor system.
>>
>> ----------
>>
>> A few points which I need help on:
>>
>> 1. There is a separate `FileParser` actor for each file. Is this fine? 
>> What is the benefit of using a router which routes to FileParser actor? 
>> Will its use only help in controlling the number of fileparser actors and 
>> also how load is distributed among these actors?
>>
>> 2. There is a single Aggregator actor which counts the number of lines 
>> for each file. It's using an instance `HashMap` and I hope this is fine. Or 
>> will a separate aggregator actor for each file improve performance?
>>
>> 3. Also, I am passing the number of files to Aggregator actor while it's 
>> created so that I can shutdown the actor system once all files are 
>> processed. If I have a separate Aggregator for each file, I'm not sure how 
>> to shutdown.
>>
>> 4. Each file is only sequentially processed i.e a `FileParser` actor is 
>> reading the file sequentially and and then invoking aggregator for each 
>> line. Is this fine or can it be improved.
>>
>> ----------
>>
>> **Application**
>>
>> /**
>> * The Application program bootstraps the actorsystem for parsing files in 
>> a
>> * given directory and finding their linecount
>> * 
>> * @author 
>> * @version 1.0
>> */
>> public class Application {
>> public void start(String directoryPath) {
>> ActorSystem actorSystem = ActorSystem.create("logProcessor");
>> ActorRef fileScanner = actorSystem.actorOf(
>> Props.create(FileScanner.class), "fileScanner");
>> fileScanner.tell(new Scan(directoryPath), ActorRef.noSender());
>> }
>>
>> public static void main(String[] args) {
>>
>> if (args.length < 1) {
>> System.out
>> .println("Usage: java -jar log-process-1.0-SNAPSHOT.jar <directorypath>");
>> System.exit(0);
>> }
>> String path = args[0];
>> Application application = new Application();
>> application.start(path);
>> }
>> }
>>
>> **FileScanner**
>>
>> /**
>> * The FileScanner program scans for files in a given directory
>> * 
>> * @author 
>> * @version 1.0
>> */
>> public class FileScanner extends UntypedActor {
>>
>> public FileScanner() {
>> }
>>
>> /**
>> * Invoked by the Actor System to scan a given directory
>> * 
>> * @param message
>> *            The message to process
>> */
>> public void onReceive(Object message) {
>> ActorRef parser;
>> if (message instanceof Scan) {
>> Scan scan = (Scan) message;
>> System.out.println("Scan directory: " + scan.getDirectory());
>>
>> // Only top level files in the directory are read.No recursion is
>> // done
>> File directory = new File(scan.getDirectory());
>> // Incase of large number of files,we need to optimize below call.
>> File[] files = directory.listFiles();
>> // Required to shutdown actorsystem after all files are processed
>> int numberOfFiles = 0;
>>
>> /*
>> * To only count the files and ignore any folders
>> */
>> for (File file : files) {
>> if (file.isFile())
>> numberOfFiles++;
>> }
>> ActorRef aggregator = getContext()
>> .actorOf(Props.create(Aggregator.class, numberOfFiles),
>> "aggregator");
>> File file;
>> for (int i = 0; i < files.length; i++) {
>> file = files[i];
>> if (!file.isFile())
>> continue;
>> System.out.println(file.getName());
>>
>> /*
>> * Use a unique identifier(counter) for actor names as file
>> * names can have special characters(ex:readme (copy).md) and
>> * hence cannot be directly used as actor names
>> * 
>> * Docs:Actor paths MUST: not start with `$`, // include only
>> * ASCII letters and can only contain these special //
>> * characters: -_.*$+:@&=,!~';.
>> */
>> parser = getContext().actorOf(
>> Props.create(FileParser.class, aggregator),
>> "parser-" + i);
>> parser.tell(new Parse(file.getAbsolutePath()), getSelf());
>> }
>> } else {
>> unhandled(message);
>> }
>> }
>> }
>>
>> **FileParser**
>>
>> /**
>> * The FileScanner program scans for files in a given directory
>> * 
>> * @author 
>> * @version 1.0
>> */
>> public class FileParser extends UntypedActor {
>>
>> /**
>> * An aggregator actor reference to send file events to.
>> */
>> private ActorRef aggregator;
>>
>> public FileParser(ActorRef aggregator) {
>> this.aggregator = aggregator;
>> }
>>
>> /**
>> * Invoked by the mailbox when it receives a thread timeslice and a message
>> * is available to it from FileScanner.It reads only text files and any
>> * other files are not handled
>> * 
>> * @param message
>> *            The message to process
>> */
>> public void onReceive(Object message) {
>> if (message instanceof Parse) {
>> Parse parseMessage = (Parse) message;
>> System.out.println("Parse File : " + parseMessage.getFilePath());
>>
>> Path file = Paths.get(parseMessage.getFilePath());
>> try {
>> BufferedReader reader = Files.newBufferedReader(file,
>> StandardCharsets.UTF_8);
>> String line = null;
>> Line lineMessage = new Line(parseMessage.getFilePath());
>> aggregator.tell(new Start(parseMessage.getFilePath()),
>> getSelf());
>> while ((line = reader.readLine()) != null) {
>> aggregator.tell(lineMessage, getSelf());
>> }
>> aggregator.tell(new End(parseMessage.getFilePath()), getSelf());
>> } catch (IOException e) {
>> e.printStackTrace();
>> }
>> } else {
>> unhandled(message);
>> }
>> }
>>
>> }
>>
>> **Aggregator**
>>
>> /**
>> * The Aggregator program counts the number of lines for each file.
>> * 
>> * @author 
>> * @version 1.0
>> */
>> public class Aggregator extends UntypedActor {
>> /**
>> * Number of files that are processed.
>> */
>> private int numberOfFiles;
>> /**
>> * Number of lines per file
>> */
>> private HashMap<String, Integer> lineCount;
>> /**
>> * A running count of processed files
>> */
>> private int count;
>>
>> public Aggregator(int numberOfFiles) {
>> this.numberOfFiles = numberOfFiles;
>> lineCount = new HashMap<>();
>> }
>>
>> /**
>> * Invoked by the mailbox when it receives a thread timeslice and a file
>> * event(start,line or end) is available from FileParser
>> * 
>> * @param message
>> *            The message to process
>> */
>> public void onReceive(Object message) {
>> if (message instanceof End) {
>> End end = (End) message;
>> count++;// we can use bigint incase of long files.
>> System.out.println("Line count of file : " + end.getFilePath()
>> + " is :" + lineCount.get(end.getFilePath()));
>> if (count >= numberOfFiles) {
>> getContext().system().shutdown();
>> }
>> } else if (message instanceof Line) {
>> Line line = (Line) message;
>> String path = line.getFilePath();
>> if (lineCount.containsKey(path)) {
>> lineCount.put(path, lineCount.get(path) + 1);
>> } else {
>> lineCount.put(path, 1);
>> }
>> } else if (message instanceof Start) {
>> // Started processing a new FIle
>> } else {
>> unhandled(message);
>> }
>> }
>>
>> }
>>
>> -- 
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: 
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> --- 
>> You received this message because you are subscribed to the Google Groups 
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to akka-user+...@googlegroups.com <javascript:>.
>> To post to this group, send email to akka...@googlegroups.com 
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> -- 
> Cheers,
> √
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to