Neither in the jOOQ 3.8 manual, nor anywhere online, can I find examples of
how to use the new transactionAsync() and transactionResultAsync() methods.
I have an existing method that I'd like to transition to the async
paradigm. It goes something like this:
Mutator<User> mutator = u -> { ... };
User updatedUser =
DSL.using(configuration).transactionResult(transactionConfiguration -> {
UserRecord userRecord = DSL.using(transactionConfiguration)
.selectFrom(USER)
.where(USER.USER_ID.equal(userId))
.forUpdate()
.fetchOne();
if (userRecord == null) {
throw new ObjectNotFoundException("User(userId = %d) not found",
userId);
}
else {
User user = new User(userRecord);
mutator.mutate(user);
userRecord.store();
return user;
}
});
fetchOne(), store(), and transactionResult() are all blocking calls. The
first obvious step is to use the new transactionResultAsync() method, which
looks like this:
CompletionStage<User> updatedUserStage
=
DSL.using(configuration).transactionResultAsync(transactionConfiguration ->
{
UserRecord userRecord = DSL.using(transactionConfiguration)
.selectFrom(USER)
.where(USER.USER_ID.equal(userId))
.forUpdate()
.fetchOne();
if (userRecord == null) {
throw new ObjectNotFoundException("User(userId = %d) not found",
userId);
}
else {
User user = new User(userRecord);
mutator.mutate(user);
userRecord.store();
return user;
}
});
This is a step in the right direction. The caller of the method now has a
CompletionStage that can be chained, composed, etc. in a non-blocking way.
But store() and fetchOne() are still blocking calls, so the executor thread
that invokes the transactionResultAsync() lambda, is going to block.
This is where I'm uncertain on how to proceed. Was it the designers' intent
that we would stop here? I'd like to continue with fetchAsync() but it
starts to get hairy:
CompletionStage<CompletionStage<User>> userCompletionStageCompletionStage =
DSL.using(configuration)
.transactionResultAsync(transactionConfiguration ->
DSL.using(transactionConfiguration)
.selectFrom(USER)
.where(USER.USER_ID.equal(userId))
.forUpdate()
.fetchAsync()
.thenApply(result -> {
if (result.isEmpty()) {
throw new ObjectNotFoundException("User(userId =
%d) not found", userId);
}
else {
UserRecord userRecord = result.get(0);
User user = new User(userRecord);
mutator.mutate(user);
userRecord.store();
return user;
}
})
);
That just doesn't seem right. If it wasn't for the fact that the
asynchronous transactional scope "wraps" the we could compose the stages
together and flatten the stages. This is, in fact, the approach we could
take if we wanted to squash that last blocking call, store():
CompletionStage<CompletionStage<User>> userCompletionStageCompletionStage =
DSL.using(configuration)
.transactionResultAsync(transactionConfiguration -> {
CompletionStage<Result<UserRecord>>
selectCompletionStage = DSL.using(transactionConfiguration)
.selectFrom(USER)
.where(USER.USER_ID.equal(userId))
.forUpdate()
.fetchAsync();
Function<Result<UserRecord>, CompletionStage<User>>
mutateAndPersistAsync = result -> {
if (result.isEmpty()) {
throw new ObjectNotFoundException("User(userId
= %d) not found", userId);
}
else {
return CompletableFuture.supplyAsync(() -> {
UserRecord userRecord = result.get(0);
User user = new User(userRecord);
mutator.mutate(user);
userRecord.store();
return user;
}, configuration.executorProvider().provide());
}
};
return
selectCompletionStage.thenCompose(mutateAndPersistAsync);
});
But that still leaves us with CompletionStage<CompletionStage<User>>. Am I
missing something? Any and all feedback appreciated.
Thanks!
--
You received this message because you are subscribed to the Google Groups "jOOQ
User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.