korlov42 commented on code in PR #4615:
URL: https://github.com/apache/ignite-3/pull/4615#discussion_r1834191539


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java:
##########
@@ -310,13 +314,26 @@ MultiStatementHandler createScriptHandler(Query query) {
         );
     }
 
-    private void trackQuery(Query query) {
+    private void trackQuery(Query query, @Nullable CancellationToken 
cancellationToken) {
         Query old = runningQueries.put(query.id, query);
 
         assert old == null : "Query with the same id already registered";
 
-        query.onPhaseStarted(ExecutionPhase.TERMINATED)
-                .whenComplete((ignored, ex) -> 
runningQueries.remove(query.id));
+        CompletableFuture<Void> queryTerminationFut = 
query.onPhaseStarted(ExecutionPhase.TERMINATED);
+
+        if (cancellationToken != null) {
+            CompletableFuture<Void> cancellationFuture = 
CancelHandleHelper.getCancellationFuture(cancellationToken);
+            CancelHandleHelper.addCancelAction(cancellationToken, 
query.cancel::cancel, queryTerminationFut);

Review Comment:
   the actual "competition future" to wait is the one returned from 
`queryTerminationFut.whenComplete(...)`



##########
modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.CompletableFuture;
+
+/** Implementation of {@link CancelHandle}. */
+final class CancelHandleImpl implements CancelHandle {
+
+    private final CompletableFuture<Void> cancelFut = new 
CompletableFuture<>();
+
+    private final CancellationTokenImpl token;
+
+    CancelHandleImpl() {
+        this.token = new CancellationTokenImpl(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void cancel() {
+        doCancelAsync();
+
+        cancelFut.join();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> cancelAsync() {
+        doCancelAsync();
+
+        return cancelFut;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isCancelled() {
+        return token.isCancelled();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CancellationToken token() {
+        return token;
+    }
+
+    private void doCancelAsync() {
+        token.cancel();
+    }
+
+    static final class CancellationTokenImpl implements CancellationToken {
+
+        private final ArrayDeque<Cancellation> cancellations = new 
ArrayDeque<>();
+
+        private final CancelHandleImpl handle;
+
+        private final Object mux = new Object();
+
+        private volatile CompletableFuture<Void> cancelFut;
+
+        CancellationTokenImpl(CancelHandleImpl handle) {
+            this.handle = handle;
+        }
+
+        void addCancelAction(Runnable action, CompletableFuture<?> fut) {
+            Cancellation cancellation = new Cancellation(action, fut);
+
+            if (cancelFut != null) {
+                cancellation.run();
+            } else {
+                synchronized (mux) {
+                    if (cancelFut == null) {
+                        cancellations.add(cancellation);
+                    }
+                }
+            }
+        }
+
+        CompletableFuture<Void> cancelHandleFut() {
+            return handle.cancelFut;
+        }
+
+        boolean isCancelled() {
+            return cancelFut != null;
+        }
+
+        @SuppressWarnings("rawtypes")
+        void cancel() {
+            if (cancelFut != null) {
+                return;
+            }
+
+            synchronized (mux) {
+                if (cancelFut != null) {
+                    return;
+                }
+
+                // First assemble all completion futures
+                CompletableFuture[] futures = cancellations.stream()
+                        .map(c -> c.completionFut)
+                        .toArray(CompletableFuture[]::new);
+
+                // handle.cancelFut completes when all cancellation futures 
complete.
+                cancelFut = CompletableFuture.allOf(futures).whenComplete((r, 
t) -> {
+                    handle.cancelFut.complete(null);
+                });
+            }
+
+            // Run cancellation actions outside of lock
+            for (Cancellation cancellation : cancellations) {
+                cancellation.run();
+            }

Review Comment:
   we need to introduce error handling here: catch exceptions, compose a single 
one via `addSuppressed` and, perhaps, complete `handle.cancelFut` exceptionally 
(this must be reflected in javadoc). 



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java:
##########
@@ -310,13 +314,26 @@ MultiStatementHandler createScriptHandler(Query query) {
         );
     }
 
-    private void trackQuery(Query query) {
+    private void trackQuery(Query query, @Nullable CancellationToken 
cancellationToken) {
         Query old = runningQueries.put(query.id, query);
 
         assert old == null : "Query with the same id already registered";
 
-        query.onPhaseStarted(ExecutionPhase.TERMINATED)
-                .whenComplete((ignored, ex) -> 
runningQueries.remove(query.id));
+        CompletableFuture<Void> queryTerminationFut = 
query.onPhaseStarted(ExecutionPhase.TERMINATED);
+
+        if (cancellationToken != null) {
+            CompletableFuture<Void> cancellationFuture = 
CancelHandleHelper.getCancellationFuture(cancellationToken);
+            CancelHandleHelper.addCancelAction(cancellationToken, 
query.cancel::cancel, queryTerminationFut);
+
+            queryTerminationFut.whenComplete((ignored, ex) -> {
+                runningQueries.remove(query.id);
+                cancellationFuture.complete(null);

Review Comment:
   seems like first finished query attached to a token will release 
cancellation handle even though cancellation has never been claimed.



##########
modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.CancelHandleImpl.CancellationTokenImpl;
+
+/**
+ * Utility class to provide direct access to internals of {@link 
CancelHandleImpl}.
+ */
+public final class CancelHandleHelper {
+
+    private CancelHandleHelper() {
+
+    }
+
+    /**
+     * Attaches a cancellable operation to the given token. A cancellation 
procedure started its handle completes
+     * when {@code completionFut} completes.
+     *
+     * <p>NOTE: If a handle, this token is associated with, was cancelled or 
its cancellation was requested,
+     * this method immediately invokes {@code cancelAction.run()} and it this 
case
+     * <b>it never waits for {@code completionFut} to complete</b>.
+     *
+     * <p>The following methods request cancellation of a handle:
+     * <ul>
+     *     <li>{@link CancelHandle#cancel()}</li>
+     *     <li>{@link CancelHandle#cancelAsync()}</li>
+     * </ul>
+     *
+     * @param token Cancellation token.
+     * @param cancelAction Action that terminates an operation.
+     * @param completionFut Future that completes when operation completes and 
all resources it created are released.
+     */
+    public static void addCancelAction(
+            CancellationToken token,
+            Runnable cancelAction,
+            CompletableFuture<?> completionFut
+    ) {
+        Objects.requireNonNull(token, "token");
+        Objects.requireNonNull(cancelAction, "cancelAction");
+        Objects.requireNonNull(completionFut, "completionFut");
+
+        CancellationTokenImpl t = unwrapToken(token);
+        t.addCancelAction(cancelAction, completionFut);
+    }
+
+    /**
+     * Returns a future associated with this cancellation token's handle.
+     *
+     * @param token Cancellation token.
+     */
+    public static CompletableFuture<Void> 
getCancellationFuture(CancellationToken token) {

Review Comment:
   this is dangerous. The only possible interaction with handle must be done 
through `addCancelAction`, otherwise it's possible to accidentally complete 
handle future prematurely (like you do in [this 
line](https://github.com/apache/ignite-3/pull/4615/files#diff-a1d097836125cf72c6dd82aa6b8439e13187b29aaf8e844b341a497b20cc0f94R330))



##########
modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.CompletableFuture;
+
+/** Implementation of {@link CancelHandle}. */
+final class CancelHandleImpl implements CancelHandle {
+
+    private final CompletableFuture<Void> cancelFut = new 
CompletableFuture<>();
+
+    private final CancellationTokenImpl token;
+
+    CancelHandleImpl() {
+        this.token = new CancellationTokenImpl(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void cancel() {
+        doCancelAsync();
+
+        cancelFut.join();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> cancelAsync() {
+        doCancelAsync();
+
+        return cancelFut;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean isCancelled() {
+        return token.isCancelled();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CancellationToken token() {
+        return token;
+    }
+
+    private void doCancelAsync() {
+        token.cancel();
+    }
+
+    static final class CancellationTokenImpl implements CancellationToken {
+
+        private final ArrayDeque<Cancellation> cancellations = new 
ArrayDeque<>();
+
+        private final CancelHandleImpl handle;
+
+        private final Object mux = new Object();
+
+        private volatile CompletableFuture<Void> cancelFut;
+
+        CancellationTokenImpl(CancelHandleImpl handle) {
+            this.handle = handle;
+        }
+
+        void addCancelAction(Runnable action, CompletableFuture<?> fut) {
+            Cancellation cancellation = new Cancellation(action, fut);
+
+            if (cancelFut != null) {
+                cancellation.run();
+            } else {
+                synchronized (mux) {
+                    if (cancelFut == null) {

Review Comment:
   if we loose race, we still need to run given action



##########
modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.CompletableFuture;
+
+/** Implementation of {@link CancelHandle}. */
+final class CancelHandleImpl implements CancelHandle {
+
+    private final CompletableFuture<Void> cancelFut = new 
CompletableFuture<>();
+
+    private final CancellationTokenImpl token;
+
+    CancelHandleImpl() {
+        this.token = new CancellationTokenImpl(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void cancel() {
+        doCancelAsync();
+
+        cancelFut.join();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> cancelAsync() {
+        doCancelAsync();
+
+        return cancelFut;

Review Comment:
   let's return copy of the original `cancelFut`, so completion of returned 
future won't complete the handler



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java:
##########
@@ -463,11 +464,9 @@ private AsyncDataCursor<InternalSqlRow> 
executeDdl(SqlOperationContext operation
         assert queryCancel != null;
 
         queryCancel.add(timeout -> {
-            if (!timeout) {
-                return;
-            }
+            String message = timeout ? QueryCancelledException.TIMEOUT_MSG : 
QueryCancelledException.CANCEL_MSG;
 
-            ret.completeExceptionally(new 
QueryCancelledException(QueryCancelledException.TIMEOUT_MSG));
+            ret.completeExceptionally(new QueryCancelledException(message));

Review Comment:
   this comment seems has not been addressed yet



##########
modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+/**
+ * Cancellation token an object is issued by {@link CancelHandle} that can be 
used by an operation or a resource to observe a signal
+ * to terminate it.
+ */

Review Comment:
   ```suggestion
    * Cancellation token is an object that is issued by {@link CancelHandle} 
and can be used by an operation or a resource to observe a signal
    * to terminate it.
    */
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/KeyValueModifyPlan.java:
##########
@@ -141,7 +141,7 @@ public <RowT> AsyncCursor<InternalSqlRow> execute(
             result.whenCompleteAsync((res, err) -> 
firstPageReadyCallback.onPrefetchComplete(err), executor);
         }
 
-        ctx.scheduleTimeout(result);
+        ctx.subscribeToCancellation(result);

Review Comment:
   KV operation are not cancellable at the moment, thus only timeout must 
complete `result` future. Even KV read holds locks (in case of external RW 
transaction) or start implicit transaction which must be finished before 
release of the cancellation future by contract. I think it's better to revert 
changes in `ExecutionContext`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to