I'm newbie in Akka. Here's one thing I'm trying to achieve, a simple job manager. There are jobs involving SQL queries and CPU intended computation. The jobs should be cancellable while they're running. Here's my code to simulate it. The worker actor sends "continue" message to itself if no cancellation signal. However, the master actor may sends another job to it while the current job is still running. Question is how to guarantee the jobs don't step on each other? This may not be the way Actors do. Any suggestions are very welcome.
import java.io.IOException; import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.routing.RoundRobinPool; class StartJob { private final String id; private final int time; public StartJob(String id, int time) { super(); this.id = id; this.time = time; } public String getId() { return id; } public int getTime() { return time; } } class CancelJob { private final String id; public CancelJob(String id) { super(); this.id = id; } public String getId() { return id; } } class Worker extends AbstractActor { private String jobId; private boolean cancelling; private int time; private int i; @Override public Receive createReceive() { return receiveBuilder().match(StartJob.class, job -> { jobId = job.getId(); cancelling = false; i = 0; time = job.getTime(); System.out.printf("Worker %s starts with id=%s\n", self(), job.getId()); self().tell("continue", self()); }).matchEquals("continue", p -> { if (cancelling) { System.out.printf("Worker %s cancelled.\n", jobId); return; } System.out.printf("Worker %s progress %.0f%%\n", jobId, i * 100.0 / time); Thread.sleep(1000); // simulate SQL query for (int j=0; j<1000000; j++); // simulate CPU intended job i++; if (i < time) { self().tell("continue", self()); } else { System.out.printf("Worker %s finished.\n", self()); } }).match(CancelJob.class, job -> job.getId().equals(jobId), job -> { System.out.printf("Worker %s cancelling.\n", job.getId()); cancelling = true; }).build(); } } class Master extends AbstractActor { private final ActorRef workerRouter; public Master(int numWorkers) { workerRouter = this.getContext().actorOf(Props.create(Worker.class).withRouter(new RoundRobinPool(numWorkers)), "workerRouter"); } @Override public Receive createReceive() { return receiveBuilder().match(StartJob.class, job -> { System.out.printf("Submitting job %s\n", job.getId()); workerRouter.tell(job, self()); }).match(CancelJob.class, job -> { System.out.printf("Cancelling job %s\n", job.getId()); getContext().actorSelection("workerRouter/*").tell(job, self()); }).build(); } } public class AnalyticsApp { public static void main(String[] args) throws IOException, InterruptedException { ActorSystem system = ActorSystem.create("JobManager"); ActorRef masterRef = system.actorOf(Props.create(Master.class, 2), "job-manager"); System.out.println("Master: " + masterRef); for (int i = 0; i < 4; i++) { masterRef.tell(new StartJob(Integer.toString(i), 5), ActorRef.noSender()); } Thread.sleep(1000); masterRef.tell(new CancelJob("1"), ActorRef.noSender()); System.out.println(">>> Press ENTER to exit <<<"); try { System.in.read(); } finally { system.terminate(); } } } -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.