This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 45b612417fb IGNITE-27856 Extend CancellationToken API (#7921)
45b612417fb is described below
commit 45b612417fb1ff7a4cc0fb8be01956906134ce08
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Mon Apr 6 11:40:10 2026 +0300
IGNITE-27856 Extend CancellationToken API (#7921)
---
.../org/apache/ignite/lang/CancelHandleImpl.java | 60 +++++-
.../org/apache/ignite/lang/CancellationToken.java | 20 ++
.../ignite/internal/client/sql/ClientSql.java | 2 +-
.../org/apache/ignite/lang/CancelHandleHelper.java | 11 -
.../ignite/lang/CancelHandleHelperSelfTest.java | 235 +++++++++++++++++----
5 files changed, 275 insertions(+), 53 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
index a28c97e3d00..4836e2cc526 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/CancelHandleImpl.java
@@ -18,6 +18,9 @@
package org.apache.ignite.lang;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.lang.ErrorGroups.Common;
@@ -69,6 +72,8 @@ final class CancelHandleImpl implements CancelHandle {
private final ArrayDeque<Cancellation> cancellations = new
ArrayDeque<>();
+ private final List<Runnable> listeners = new ArrayList<>();
+
private final CancelHandleImpl handle;
private final Object mux = new Object();
@@ -96,12 +101,47 @@ final class CancelHandleImpl implements CancelHandle {
}
}
- boolean isCancelled() {
+ @Override
+ public boolean isCancelled() {
return cancelFut != null;
}
+ @Override
+ public AutoCloseable addListener(Runnable callback) {
+ Objects.requireNonNull(callback, "callback");
+
+ if (cancelFut != null) {
+ runListener(callback);
+ return () -> { };
+ }
+
+ synchronized (mux) {
+ if (cancelFut == null) {
+ listeners.add(callback);
+ return () -> {
+ synchronized (mux) {
+ listeners.remove(callback);
+ }
+ };
+ }
+ }
+
+ runListener(callback);
+ return () -> { };
+ }
+
+ private static void runListener(Runnable callback) {
+ try {
+ callback.run();
+ } catch (Throwable t) {
+ throw new IgniteException(Common.INTERNAL_ERR, "Failed to
cancel an operation", t);
+ }
+ }
+
@SuppressWarnings("rawtypes")
void cancel() {
+ List<Runnable> listenersCopy;
+
if (cancelFut != null) {
return;
}
@@ -120,6 +160,9 @@ final class CancelHandleImpl implements CancelHandle {
cancelFut = CompletableFuture.allOf(futures).whenComplete((r,
t) -> {
handle.cancelFut.complete(null);
});
+
+ listenersCopy = new ArrayList<>(listeners);
+ listeners.clear();
}
IgniteException error = null;
@@ -136,6 +179,21 @@ final class CancelHandleImpl implements CancelHandle {
}
}
+ // Release references to cancellation actions after execution
+ cancellations.clear();
+
+ // Run listener callbacks outside of lock
+ for (Runnable listener : listenersCopy) {
+ try {
+ listener.run();
+ } catch (Throwable t) {
+ if (error == null) {
+ error = new IgniteException(Common.INTERNAL_ERR,
"Failed to cancel an operation");
+ }
+ error.addSuppressed(t);
+ }
+ }
+
if (error != null) {
throw error;
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
b/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
index 52857cd3171..e6cb5c53161 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/CancellationToken.java
@@ -22,4 +22,24 @@ package org.apache.ignite.lang;
* to terminate it.
*/
public interface CancellationToken {
+ /**
+ * Flag indicating whether cancellation was requested or not.
+ *
+ * <p>This method will return {@code true} even if cancellation has not
been completed yet.
+ *
+ * @return {@code true} when cancellation was requested.
+ */
+ boolean isCancelled();
+
+ /**
+ * Registers a callback to be executed when cancellation is requested. If
cancellation has already been requested,
+ * the callback is executed immediately.
+ *
+ * <p>The returned handle can be used to stop listening for cancellation
requests. It is important to close the handle
+ * when the callback is no longer needed to avoid memory leaks.
+ *
+ * @param callback Callback to execute when cancellation is requested.
+ * @return A handle which can be used to stop listening for cancellation
requests.
+ */
+ AutoCloseable addListener(Runnable callback);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 93c5b9a6fe9..5fb3783f500 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -651,7 +651,7 @@ public class ClientSql implements IgniteSql {
private static void addCancelAction(CancellationToken cancellationToken,
PayloadOutputChannel ch) {
CompletableFuture<Void> cancelFuture = new CompletableFuture<>();
- if (CancelHandleHelper.isCancelled(cancellationToken)) {
+ if (cancellationToken.isCancelled()) {
throw new SqlException(Sql.EXECUTION_CANCELLED_ERR, "The query was
cancelled while executing.");
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
index f22c87ac136..f10477d1b43 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/CancelHandleHelper.java
@@ -78,17 +78,6 @@ public final class CancelHandleHelper {
addCancelAction(token, () -> completionFut.cancel(true),
completionFut);
}
- /**
- * Flag indicating whether cancellation was requested or not.
- *
- * <p>This method will return true even if cancellation has not been
completed yet.
- *
- * @return {@code True} if a cancellation was previously requested, {@code
false} otherwise.
- */
- public static boolean isCancelled(CancellationToken token) {
- return unwrapToken(token).isCancelled();
- }
-
private static CancellationTokenImpl unwrapToken(CancellationToken token) {
if (token instanceof CancellationTokenImpl) {
return (CancellationTokenImpl) token;
diff --git
a/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
index 6e7dab74185..35f7c9b3f65 100644
---
a/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/lang/CancelHandleHelperSelfTest.java
@@ -17,22 +17,23 @@
package org.apache.ignite.lang;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
/**
* Tests for {@link CancelHandleHelper}.
@@ -141,12 +142,13 @@ public class CancelHandleHelperSelfTest extends
BaseIgniteAbstractTest {
cancelHandle.cancel();
assertTrue(cancelHandle.isCancelled());
- Runnable action = Mockito.mock(Runnable.class);
+ AtomicInteger counter = new AtomicInteger();
+ Runnable action = counter::incrementAndGet;
CompletableFuture<Void> f = new CompletableFuture<>();
// Attach it to some operation hasn't completed yet
CancelHandleHelper.addCancelAction(token, action, f);
- verify(action, times(1)).run();
+ assertThat(counter.get(), is(1));
cancelHandle.cancelAsync().join();
// We do not wait for cancellation to complete because
@@ -155,7 +157,7 @@ public class CancelHandleHelperSelfTest extends
BaseIgniteAbstractTest {
// Action runs immediately
CancelHandleHelper.addCancelAction(token, action, f);
- verify(action, times(2)).run();
+ assertThat(counter.get(), is(2));
}
@Test
@@ -166,12 +168,13 @@ public class CancelHandleHelperSelfTest extends
BaseIgniteAbstractTest {
cancelHandle.cancelAsync();
assertTrue(cancelHandle.isCancelled());
- Runnable action = Mockito.mock(Runnable.class);
+ AtomicInteger counter = new AtomicInteger();
+ Runnable action = counter::incrementAndGet;
CompletableFuture<Void> f = new CompletableFuture<>();
// Attach it to some operation hasn't completed yet
CancelHandleHelper.addCancelAction(token, action, f);
- verify(action, times(1)).run();
+ assertThat(counter.get(), is(1));
cancelHandle.cancelAsync().join();
// We do not wait for cancellation to complete because
@@ -180,39 +183,33 @@ public class CancelHandleHelperSelfTest extends
BaseIgniteAbstractTest {
// Action runs immediately
CancelHandleHelper.addCancelAction(token, action, f);
- verify(action, times(2)).run();
+ assertThat(counter.get(), is(2));
}
@Test
public void testArgumentsMustNotBeNull() {
CancelHandle cancelHandle = CancelHandle.create();
CancellationToken token = cancelHandle.token();
- Runnable action = Mockito.mock(Runnable.class);
+ Runnable action = () -> {};
CompletableFuture<Void> f = nullCompletedFuture();
- {
- NullPointerException err = assertThrows(
- NullPointerException.class,
- () -> CancelHandleHelper.addCancelAction(null, action, f)
- );
- assertEquals("token", err.getMessage());
- }
-
- {
- NullPointerException err = assertThrows(
- NullPointerException.class,
- () -> CancelHandleHelper.addCancelAction(token, null, f)
- );
- assertEquals("cancelAction", err.getMessage());
- }
-
- {
- NullPointerException err = assertThrows(
- NullPointerException.class,
- () -> CancelHandleHelper.addCancelAction(token, action,
null)
- );
- assertEquals("completionFut", err.getMessage());
- }
+ assertThrows(
+ NullPointerException.class,
+ () -> CancelHandleHelper.addCancelAction(null, action, f),
+ "token"
+ );
+
+ assertThrows(
+ NullPointerException.class,
+ () -> CancelHandleHelper.addCancelAction(token, null, f),
+ "cancelAction"
+ );
+
+ assertThrows(
+ NullPointerException.class,
+ () -> CancelHandleHelper.addCancelAction(token, action, null),
+ "completionFut"
+ );
}
@Test
@@ -284,10 +281,168 @@ public class CancelHandleHelperSelfTest extends
BaseIgniteAbstractTest {
f1.complete(null);
f2.complete(null);
- IgniteException err = assertThrows(IgniteException.class,
cancelHandle::cancel);
+ Throwable err = assertThrowsWithCode(
+ IgniteException.class,
+ Common.INTERNAL_ERR,
+ cancelHandle::cancel,
+ "Failed to cancel an operation"
+ );
+
+ assertThat(err.getSuppressed(), arrayContaining(e1, e2));
+ }
+
+ @Test
+ public void testTokenIsCancelled() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ assertThat(token.isCancelled(), is(false));
+
+ cancelHandle.cancel();
+
+ assertThat(token.isCancelled(), is(true));
+ }
+
+ @Test
+ public void testTokenIsCancelledAsync() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ assertThat(token.isCancelled(), is(false));
+
+ cancelHandle.cancelAsync();
+
+ assertThat(token.isCancelled(), is(true));
+ }
+
+ @Test
+ public void testListenCallbackInvokedOnCancel() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ AtomicBoolean callbackCalled = new AtomicBoolean();
+ token.addListener(() -> callbackCalled.set(true));
+
+ assertThat(callbackCalled.get(), is(false));
+
+ cancelHandle.cancel();
+
+ assertThat(callbackCalled.get(), is(true));
+ }
+
+ @Test
+ public void testListenCallbackInvokedImmediatelyIfAlreadyCancelled() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ cancelHandle.cancel();
+
+ AtomicBoolean callbackCalled = new AtomicBoolean();
+ token.addListener(() -> callbackCalled.set(true));
+
+ assertThat(callbackCalled.get(), is(true));
+ }
+
+ @Test
+ public void testListenMultipleCallbacks() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ AtomicInteger counter = new AtomicInteger();
+ token.addListener(counter::incrementAndGet);
+ token.addListener(counter::incrementAndGet);
+ token.addListener(counter::incrementAndGet);
+
+ cancelHandle.cancel();
+
+ assertThat(counter.get(), is(3));
+ }
+
+ @Test
+ public void testListenCloseRemovesCallback() throws Exception {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ AtomicBoolean callbackCalled = new AtomicBoolean();
+ AutoCloseable handle = token.addListener(() ->
callbackCalled.set(true));
+
+ // Close the handle before cancellation
+ handle.close();
+
+ cancelHandle.cancel();
+
+ // Callback should not have been called
+ assertThat(callbackCalled.get(), is(false));
+ }
+
+ @Test
+ public void testListenCloseAfterCancelIsNoOp() throws Exception {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ AtomicBoolean callbackCalled = new AtomicBoolean();
+ AutoCloseable handle = token.addListener(() ->
callbackCalled.set(true));
+
+ cancelHandle.cancel();
+ assertThat(callbackCalled.get(), is(true));
+
+ // Close after cancel - should not throw
+ handle.close();
+ }
+
+ @Test
+ public void testListenNullCallbackThrows() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ assertThrows(
+ NullPointerException.class,
+ () -> token.addListener(null),
+ "callback"
+ );
+ }
+
+ @Test
+ public void testExceptionInListenerCallbackWrappedWhenAlreadyCancelled() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ cancelHandle.cancel();
+
+ // Listener registered after cancellation should wrap exceptions
consistently
+ // with the cancel() path (IgniteException with INTERNAL_ERR).
+ assertThrowsWithCode(
+ IgniteException.class,
+ Common.INTERNAL_ERR,
+ () -> token.addListener(() -> {
+ throw new RuntimeException("listener error");
+ }),
+ "Failed to cancel an operation"
+ );
+ }
+
+ @Test
+ public void testExceptionsInListenerCallbacksAreWrapped() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ CancellationToken token = cancelHandle.token();
+
+ RuntimeException e1 = new RuntimeException("e1");
+ token.addListener(() -> {
+ throw e1;
+ });
+
+ RuntimeException e2 = new RuntimeException("e2");
+ token.addListener(() -> {
+ throw e2;
+ });
+
+ Throwable err = assertThrowsWithCode(
+ IgniteException.class,
+ Common.INTERNAL_ERR,
+ cancelHandle::cancel,
+ "Failed to cancel an operation"
+ );
- assertEquals("Failed to cancel an operation", err.getMessage());
- assertEquals(Common.INTERNAL_ERR, err.code(), err.toString());
- assertEquals(Arrays.asList(e1, e2),
Arrays.asList(err.getSuppressed()));
+ assertThat(err.getSuppressed(), arrayContaining(e1, e2));
}
}