xtern commented on code in PR #6889:
URL: https://github.com/apache/ignite-3/pull/6889#discussion_r2501901341
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java:
##########
@@ -116,6 +123,64 @@ class Query {
this.parsedResult = parsedResult;
}
+ <ResultT> CompletableFuture<ResultT> runProgram(Program<ResultT> program) {
+ ProgramExecutionState<ResultT> state = program.createState();
+
+ if (!activeProgram.compareAndSet(null, state)) {
+ throw new SqlException(Common.INTERNAL_ERR);
Review Comment:
mb add some message or comment to clarify what it means?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Program.java:
##########
@@ -101,31 +101,55 @@ CompletableFuture<ResultT> run(Query query) {
ex = ExceptionUtils.unwrapCause(ex);
// handles exception from asynchronous
part of phase evaluation
- try {
- if (errorHandler.test(query, ex)) {
- query.executor.execute(() ->
run(query));
- }
- } catch (AssertionError | Exception ex0) {
- LOG.warn("Exception in error handler
[queryId={}]", ex0, query.id);
-
- query.onError(ex);
+ if (shouldRetry(query, ex)) {
+ query.executor.execute(() ->
run(query, state));
+ } else {
+ query.setError(ex);
+ finalizeActiveProgram(query, state);
}
return;
}
query.executor.execute(() -> {
- if (advanceQuery(query)) {
- run(query);
+ if (advanceQuery(query, state)) {
+ run(query, state);
}
});
});
break;
}
}
- } while (advanceQuery(query));
+ } while (advanceQuery(query, state));
+ }
+
+ private boolean shouldRetry(Query query, Throwable th) {
+ try {
+ if (errorHandler.test(query, th)) {
+ return true;
+ }
+ } catch (AssertionError | Exception ex) {
+ LOG.warn("Exception in error handler [queryId={}]", ex, query.id);
+
+ query.terminateExceptionally(th);
+ }
- return Commons.cast(query.resultHolder);
+ return false;
+ }
+
+ private static void finalizeActiveProgram(Query query,
ProgramExecutionState<?> executionState) {
+ ProgramExecutionHandle activeHandle =
query.activeProgram.getAndSet(null);
Review Comment:
Why we need "executionState" here?
I mean we can use `activeHandle.completionFuture()` instead of
`executionState.programFinished`
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/Query.java:
##########
@@ -116,6 +123,64 @@ class Query {
this.parsedResult = parsedResult;
}
+ <ResultT> CompletableFuture<ResultT> runProgram(Program<ResultT> program) {
+ ProgramExecutionState<ResultT> state = program.createState();
+
+ if (!activeProgram.compareAndSet(null, state)) {
+ throw new SqlException(Common.INTERNAL_ERR);
+ }
+
+ program.run(this, state);
+
+ return state.resultHolder;
+ }
+
+ /**
+ * Initiates a graceful termination of the current query.
+ *
+ * <p>If the query is idle, moves it to {@link ExecutionPhase#TERMINATED}
phase immediately. Otherwise, the method waits for the
+ * currently active program to complete before proceeding. Note that
termination may not take effect immediately upon return. To wait
+ * for the actual completion of termination, use {@link
#terminationFuture}.
+ */
+ void terminate() {
+ tryTerminate(1);
+ }
+
+ private void tryTerminate(int attemptNo) {
+ if (attemptNo >= MAX_ATTEMPTS_COUNT) {
Review Comment:
Just curious - can I somehow simulate repeated attempts (attempts > 2)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]