I figured the same, so I did the following:
private final ActorRef sender = sender();
private final ActorRef self = self();
private final ActorContext context = context();
private final ActorRef parent = context.parent();
This did not help though. That is also why I logged the self, sender and
parent as the first line in the constructor of the DataWorker class below
prior to any async extra (other than usual akka)
Let me try to paste part of the class here and just remove the parts that
don't matter. Hope that is enough, yet not too big to be annoying:
public class GridActor2 extends AbstractActorWithStash {
private Optional<GridFullHeader> gridFullHeader = Optional.empty();
private final GridService gridService;
private final Cluster cluster =
Cluster.builder().addContactPoint("localhost").build();
private final Session session = cluster.connect("gradebook");
public GridActor2(final String name, final GridService gridService) {
this.gridService = gridService;
Logger.debug("start grid actor");
receive(
ReceiveBuilder.
matchAny(obj -> {
if (!gridFullHeader.isPresent()) {
stash();
final ActorRef actor =
context().actorOf(Props.create(GridActor2.Worker.class, session, null,
this.gridService), "init");
actor.tell(new GridFullHeaderRequest(name), self());
context().become(
ReceiveBuilder.
match(GridFullHeaderResponse.class, response ->
{
Logger.debug("GridFullHeaderResponse!!!!!");
gridFullHeader =
Optional.of(response.header);
unstashAll();
context().unbecome();
}).matchAny(o -> stash())
.build()
);
} else {
final ActorRef worker =
context().actorOf(Props.create(GridActor2.Worker.class, session,
gridFullHeader.get(), this.gridService));
worker.forward(obj, context());
}
}).
build()
);
}
@Override
public void postStop() {
session.closeAsync();
}
static class Worker extends AbstractActor {
private final GridFullHeader header;
private final GridService gridService;
//For gridHeader
private Optional<List<Student>> students = Optional.empty();
private Optional<List<Assignment>> assignments = Optional.empty();
private Optional<UpdateTimeHolder> updateTimeHolder =
Optional.empty();
public Worker(Session session, GridFullHeader gridFullHeader,
GridService gridService) {
this.header = gridFullHeader;
this.gridService = gridService;
Logger.debug("Start worker");
receive(
ReceiveBuilder.
match(GridFullHeaderRequest.class, request -> {
Logger.debug("GridFullHeaderRequest");
final ActorRef worker1 =
context().actorOf(Props.create(DataWorker.class, session), "w1");
final ActorRef worker2 =
context().actorOf(Props.create(DataWorker.class, session), "w2");
final ActorRef worker3 =
context().actorOf(Props.create(DataWorker.class, session), "w3");
worker1.tell(new SelectStudents(request.courseId),
self());
worker2.tell(new
SelectAssignments(request.courseId), self());
worker3.tell(new UpdateTimes(request.courseId),
self());
context().become(
ReceiveBuilder.
match(SelectStudentsResponse.class,
response -> {
students =
Optional.of(response.students);
if(students.isPresent() &&
assignments.isPresent() && updateTimeHolder.isPresent()) {
GridFullHeader header = new
GridFullHeader(response.courseId, students.get(), assignments.get(),
updateTimeHolder.get());
context().parent().tell(new
GridFullHeaderResponse(response.courseId, header), self());
}
}).
match(SelectAssignmentsResponse.class,
response -> {
assignments =
Optional.of(response.assignments);
if(students.isPresent() &&
assignments.isPresent() && updateTimeHolder.isPresent()) {
GridFullHeader header = new
GridFullHeader(response.courseId, students.get(), assignments.get(),
updateTimeHolder.get());
context().parent().tell(new
GridFullHeaderResponse(response.courseId, header), self());
}
}).
match(UpdateTimesResponse.class, response
-> {
updateTimeHolder =
Optional.of(response.updateTimeHolder);
if(students.isPresent() &&
assignments.isPresent() && updateTimeHolder.isPresent()) {
GridFullHeader header = new
GridFullHeader(response.courseId, students.get(), assignments.get(),
updateTimeHolder.get());
context().parent().tell(new
GridFullHeaderResponse(response.courseId, header), self());
}
}).
build());
}).build());
}
}
static class DataWorker extends AbstractActor {
private final BoundStatement selectStudents;
private final BoundStatement selectAssignments;
private final BoundStatement updateTimes;
private final ActorRef sender = sender();
private final ActorRef self = self();
private final ActorContext context = context();
private final ActorRef parent = context.parent();
public DataWorker(Session session) {
//Logging was done here
this.selectStudents = new
BoundStatement(session.prepare("SELECT student_id, firstname, lastname,
average FROM gradebook.students WHERE course_id = ?;"));
this.selectAssignments = new
BoundStatement(session.prepare("SELECT assignment_id, name, average,
ordernum FROM gradebook.assignments WHERE course_id = ?;"));
this.updateTimes = new BoundStatement(session.prepare("SELECT
last_assignment_update, last_student_update, last_grade_update FROM
gradebook.update_times WHERE course_id = ?;"));
receive(
ReceiveBuilder.match(SelectStudents.class, request -> {
Logger.debug("SelectStudents");
selectStudents.setString("course_id",
request.courseId);
RichResultSetFuture.wrap(session.executeAsync(selectStudents)).onComplete(rs
-> {
final List<Student> list =
rs.all().stream().map(r -> new Student(r.getString("firstname"),
r.getString("lastname"), r.getString("student_id"),
r.getString("student_id"),
r.getDouble("average"))).collect(Collectors.toList());
parent.tell(new
SelectStudentsResponse(request.courseId, list), self);
context.stop(self);
Logger.debug(self.path().toString() + " sender:
" + sender.path().toString());
});
}).match(SelectAssignments.class, request -> {
selectAssignments.setString("course_id",
request.courseId);
RichResultSetFuture.wrap(session.executeAsync(selectAssignments)).onComplete(rs
-> {
final List<Assignment> list =
rs.all().stream().map(r -> new Assignment(r.getString("assignment_id"),
r.getString("name"), r.getDouble("average"))).collect(Collectors.toList());
parent.tell(new
SelectAssignmentsResponse(request.courseId, list), self);
context().stop(self);
});
}).match(UpdateTimes.class, request -> {
updateTimes.setString("course_id",
request.courseId);
RichResultSetFuture.wrap(session.executeAsync(updateTimes)).onComplete(rs
-> {
final UpdateTimeHolder updateTimeHolder =
rs.all().stream().map(r -> new
UpdateTimeHolder(OffsetDateTime.ofInstant(r.getDate("last_student_update").toInstant(),
ZoneId.systemDefault()),
OffsetDateTime.ofInstant(r.getDate("last_assignment_update").toInstant(),
ZoneId.systemDefault()),
OffsetDateTime.ofInstant(r.getDate("last_grade_update").toInstant(),
ZoneId.systemDefault())
)).collect(Collectors.toList()).get(0);
parent.tell(new
UpdateTimesResponse(request.courseId, updateTimeHolder), sender);
context().stop(self);
});
}).build()
);
}
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.