This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 45b612417fb IGNITE-27856 Extend CancellationToken API (#7921)
45b612417fb is described below

commit 45b612417fb1ff7a4cc0fb8be01956906134ce08
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Mon Apr 6 11:40:10 2026 +0300

    IGNITE-27856 Extend CancellationToken API (#7921)
---
 .../org/apache/ignite/lang/CancelHandleImpl.java   |  60 +++++-
 .../org/apache/ignite/lang/CancellationToken.java  |  20 ++
 .../ignite/internal/client/sql/ClientSql.java      |   2 +-
 .../org/apache/ignite/lang/CancelHandleHelper.java |  11 -
 .../ignite/lang/CancelHandleHelperSelfTest.java    | 235 +++++++++++++++++----
 5 files changed, 275 insertions(+), 53 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java 
b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
index a28c97e3d00..4836e2cc526 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
@@ -18,6 +18,9 @@
 package org.apache.ignite.lang;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.lang.ErrorGroups.Common;
 
@@ -69,6 +72,8 @@ final class CancelHandleImpl implements CancelHandle {
 
         private final ArrayDeque<Cancellation> cancellations = new 
ArrayDeque<>();
 
+        private final List<Runnable> listeners = new ArrayList<>();
+
         private final CancelHandleImpl handle;
 
         private final Object mux = new Object();
@@ -96,12 +101,47 @@ final class CancelHandleImpl implements CancelHandle {
             }
         }
 
-        boolean isCancelled() {
+        @Override
+        public boolean isCancelled() {
             return cancelFut != null;
         }
 
+        @Override
+        public AutoCloseable addListener(Runnable callback) {
+            Objects.requireNonNull(callback, "callback");
+
+            if (cancelFut != null) {
+                runListener(callback);
+                return () -> { };
+            }
+
+            synchronized (mux) {
+                if (cancelFut == null) {
+                    listeners.add(callback);
+                    return () -> {
+                        synchronized (mux) {
+                            listeners.remove(callback);
+                        }
+                    };
+                }
+            }
+
+            runListener(callback);
+            return () -> { };
+        }
+
+        private static void runListener(Runnable callback) {
+            try {
+                callback.run();
+            } catch (Throwable t) {
+                throw new IgniteException(Common.INTERNAL_ERR, "Failed to 
cancel an operation", t);
+            }
+        }
+
         @SuppressWarnings("rawtypes")
         void cancel() {
+            List<Runnable> listenersCopy;
+
             if (cancelFut != null) {
                 return;
             }
@@ -120,6 +160,9 @@ final class CancelHandleImpl implements CancelHandle {
                 cancelFut = CompletableFuture.allOf(futures).whenComplete((r, 
t) -> {
                     handle.cancelFut.complete(null);
                 });
+
+                listenersCopy = new ArrayList<>(listeners);
+                listeners.clear();
             }
 
             IgniteException error = null;
@@ -136,6 +179,21 @@ final class CancelHandleImpl implements CancelHandle {
                 }
             }
 
+            // Release references to cancellation actions after execution
+            cancellations.clear();
+
+            // Run listener callbacks outside of lock
+            for (Runnable listener : listenersCopy) {
+                try {
+                    listener.run();
+                } catch (Throwable t) {
+                    if (error == null) {
+                        error = new IgniteException(Common.INTERNAL_ERR, 
"Failed to cancel an operation");
+                    }
+                    error.addSuppressed(t);
+                }
+            }
+
             if (error != null) {
                 throw error;
             }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java 
b/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
index 52857cd3171..e6cb5c53161 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
@@ -22,4 +22,24 @@ package org.apache.ignite.lang;
  * to terminate it.
  */
 public interface CancellationToken {
+    /**
+     * Flag indicating whether cancellation was requested or not.
+     *
+     * <p>This method will return {@code true} even if cancellation has not 
been completed yet.
+     *
+     * @return {@code true} when cancellation was requested.
+     */
+    boolean isCancelled();
+
+    /**
+     * Registers a callback to be executed when cancellation is requested. If 
cancellation has already been requested,
+     * the callback is executed immediately.
+     *
+     * <p>The returned handle can be used to stop listening for cancellation 
requests. It is important to close the handle
+     * when the callback is no longer needed to avoid memory leaks.
+     *
+     * @param callback Callback to execute when cancellation is requested.
+     * @return A handle which can be used to stop listening for cancellation 
requests.
+     */
+    AutoCloseable addListener(Runnable callback);
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 93c5b9a6fe9..5fb3783f500 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -651,7 +651,7 @@ public class ClientSql implements IgniteSql {
     private static void addCancelAction(CancellationToken cancellationToken, 
PayloadOutputChannel ch) {
         CompletableFuture<Void> cancelFuture = new CompletableFuture<>();
 
-        if (CancelHandleHelper.isCancelled(cancellationToken)) {
+        if (cancellationToken.isCancelled()) {
             throw new SqlException(Sql.EXECUTION_CANCELLED_ERR, "The query was 
cancelled while executing.");
         }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java 
b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
index f22c87ac136..f10477d1b43 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
@@ -78,17 +78,6 @@ public final class CancelHandleHelper {
         addCancelAction(token, () -> completionFut.cancel(true), 
completionFut);
     }
 
-    /**
-     * Flag indicating whether cancellation was requested or not.
-     *
-     * <p>This method will return true even if cancellation has not been 
completed yet.
-     *
-     * @return {@code True} if a cancellation was previously requested, {@code 
false} otherwise.
-     */
-    public static boolean isCancelled(CancellationToken token) {
-        return unwrapToken(token).isCancelled();
-    }
-
     private static CancellationTokenImpl unwrapToken(CancellationToken token) {
         if (token instanceof CancellationTokenImpl) {
             return (CancellationTokenImpl) token;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
index 6e7dab74185..35f7c9b3f65 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
@@ -17,22 +17,23 @@
 
 package org.apache.ignite.lang;
 
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
-import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.lang.ErrorGroups.Common;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
 
 /**
  * Tests for {@link CancelHandleHelper}.
@@ -141,12 +142,13 @@ public class CancelHandleHelperSelfTest extends 
BaseIgniteAbstractTest {
         cancelHandle.cancel();
         assertTrue(cancelHandle.isCancelled());
 
-        Runnable action = Mockito.mock(Runnable.class);
+        AtomicInteger counter = new AtomicInteger();
+        Runnable action = counter::incrementAndGet;
         CompletableFuture<Void> f = new CompletableFuture<>();
 
         // Attach it to some operation hasn't completed yet
         CancelHandleHelper.addCancelAction(token, action, f);
-        verify(action, times(1)).run();
+        assertThat(counter.get(), is(1));
 
         cancelHandle.cancelAsync().join();
         // We do not wait for cancellation to complete because
@@ -155,7 +157,7 @@ public class CancelHandleHelperSelfTest extends 
BaseIgniteAbstractTest {
 
         // Action runs immediately
         CancelHandleHelper.addCancelAction(token, action, f);
-        verify(action, times(2)).run();
+        assertThat(counter.get(), is(2));
     }
 
     @Test
@@ -166,12 +168,13 @@ public class CancelHandleHelperSelfTest extends 
BaseIgniteAbstractTest {
         cancelHandle.cancelAsync();
         assertTrue(cancelHandle.isCancelled());
 
-        Runnable action = Mockito.mock(Runnable.class);
+        AtomicInteger counter = new AtomicInteger();
+        Runnable action = counter::incrementAndGet;
         CompletableFuture<Void> f = new CompletableFuture<>();
 
         // Attach it to some operation hasn't completed yet
         CancelHandleHelper.addCancelAction(token, action, f);
-        verify(action, times(1)).run();
+        assertThat(counter.get(), is(1));
 
         cancelHandle.cancelAsync().join();
         // We do not wait for cancellation to complete because
@@ -180,39 +183,33 @@ public class CancelHandleHelperSelfTest extends 
BaseIgniteAbstractTest {
 
         // Action runs immediately
         CancelHandleHelper.addCancelAction(token, action, f);
-        verify(action, times(2)).run();
+        assertThat(counter.get(), is(2));
     }
 
     @Test
     public void testArgumentsMustNotBeNull() {
         CancelHandle cancelHandle = CancelHandle.create();
         CancellationToken token = cancelHandle.token();
-        Runnable action = Mockito.mock(Runnable.class);
+        Runnable action = () -> {};
         CompletableFuture<Void> f = nullCompletedFuture();
 
-        {
-            NullPointerException err = assertThrows(
-                    NullPointerException.class,
-                    () -> CancelHandleHelper.addCancelAction(null, action, f)
-            );
-            assertEquals("token", err.getMessage());
-        }
-
-        {
-            NullPointerException err = assertThrows(
-                    NullPointerException.class,
-                    () -> CancelHandleHelper.addCancelAction(token, null, f)
-            );
-            assertEquals("cancelAction", err.getMessage());
-        }
-
-        {
-            NullPointerException err = assertThrows(
-                    NullPointerException.class,
-                    () -> CancelHandleHelper.addCancelAction(token, action, 
null)
-            );
-            assertEquals("completionFut", err.getMessage());
-        }
+        assertThrows(
+                NullPointerException.class,
+                () -> CancelHandleHelper.addCancelAction(null, action, f),
+                "token"
+        );
+
+        assertThrows(
+                NullPointerException.class,
+                () -> CancelHandleHelper.addCancelAction(token, null, f),
+                "cancelAction"
+        );
+
+        assertThrows(
+                NullPointerException.class,
+                () -> CancelHandleHelper.addCancelAction(token, action, null),
+                "completionFut"
+        );
     }
 
     @Test
@@ -284,10 +281,168 @@ public class CancelHandleHelperSelfTest extends 
BaseIgniteAbstractTest {
         f1.complete(null);
         f2.complete(null);
 
-        IgniteException err = assertThrows(IgniteException.class, 
cancelHandle::cancel);
+        Throwable err = assertThrowsWithCode(
+                IgniteException.class,
+                Common.INTERNAL_ERR,
+                cancelHandle::cancel,
+                "Failed to cancel an operation"
+        );
+
+        assertThat(err.getSuppressed(), arrayContaining(e1, e2));
+    }
+
+    @Test
+    public void testTokenIsCancelled() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        assertThat(token.isCancelled(), is(false));
+
+        cancelHandle.cancel();
+
+        assertThat(token.isCancelled(), is(true));
+    }
+
+    @Test
+    public void testTokenIsCancelledAsync() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        assertThat(token.isCancelled(), is(false));
+
+        cancelHandle.cancelAsync();
+
+        assertThat(token.isCancelled(), is(true));
+    }
+
+    @Test
+    public void testListenCallbackInvokedOnCancel() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        AtomicBoolean callbackCalled = new AtomicBoolean();
+        token.addListener(() -> callbackCalled.set(true));
+
+        assertThat(callbackCalled.get(), is(false));
+
+        cancelHandle.cancel();
+
+        assertThat(callbackCalled.get(), is(true));
+    }
+
+    @Test
+    public void testListenCallbackInvokedImmediatelyIfAlreadyCancelled() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        cancelHandle.cancel();
+
+        AtomicBoolean callbackCalled = new AtomicBoolean();
+        token.addListener(() -> callbackCalled.set(true));
+
+        assertThat(callbackCalled.get(), is(true));
+    }
+
+    @Test
+    public void testListenMultipleCallbacks() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        AtomicInteger counter = new AtomicInteger();
+        token.addListener(counter::incrementAndGet);
+        token.addListener(counter::incrementAndGet);
+        token.addListener(counter::incrementAndGet);
+
+        cancelHandle.cancel();
+
+        assertThat(counter.get(), is(3));
+    }
+
+    @Test
+    public void testListenCloseRemovesCallback() throws Exception {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        AtomicBoolean callbackCalled = new AtomicBoolean();
+        AutoCloseable handle = token.addListener(() -> 
callbackCalled.set(true));
+
+        // Close the handle before cancellation
+        handle.close();
+
+        cancelHandle.cancel();
+
+        // Callback should not have been called
+        assertThat(callbackCalled.get(), is(false));
+    }
+
+    @Test
+    public void testListenCloseAfterCancelIsNoOp() throws Exception {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        AtomicBoolean callbackCalled = new AtomicBoolean();
+        AutoCloseable handle = token.addListener(() -> 
callbackCalled.set(true));
+
+        cancelHandle.cancel();
+        assertThat(callbackCalled.get(), is(true));
+
+        // Close after cancel - should not throw
+        handle.close();
+    }
+
+    @Test
+    public void testListenNullCallbackThrows() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        assertThrows(
+                NullPointerException.class,
+                () -> token.addListener(null),
+                "callback"
+        );
+    }
+
+    @Test
+    public void testExceptionInListenerCallbackWrappedWhenAlreadyCancelled() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        cancelHandle.cancel();
+
+        // Listener registered after cancellation should wrap exceptions 
consistently
+        // with the cancel() path (IgniteException with INTERNAL_ERR).
+        assertThrowsWithCode(
+                IgniteException.class,
+                Common.INTERNAL_ERR,
+                () -> token.addListener(() -> {
+                    throw new RuntimeException("listener error");
+                }),
+                "Failed to cancel an operation"
+        );
+    }
+
+    @Test
+    public void testExceptionsInListenerCallbacksAreWrapped() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        CancellationToken token = cancelHandle.token();
+
+        RuntimeException e1 = new RuntimeException("e1");
+        token.addListener(() -> {
+            throw e1;
+        });
+
+        RuntimeException e2 = new RuntimeException("e2");
+        token.addListener(() -> {
+            throw e2;
+        });
+
+        Throwable err = assertThrowsWithCode(
+                IgniteException.class,
+                Common.INTERNAL_ERR,
+                cancelHandle::cancel,
+                "Failed to cancel an operation"
+        );
 
-        assertEquals("Failed to cancel an operation", err.getMessage());
-        assertEquals(Common.INTERNAL_ERR, err.code(), err.toString());
-        assertEquals(Arrays.asList(e1, e2), 
Arrays.asList(err.getSuppressed()));
+        assertThat(err.getSuppressed(), arrayContaining(e1, e2));
     }
 }

Reply via email to