This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ead4b1e798 IGNITE-21714 Prevent threads from being hijacked in
KV/Record view APIs (#3378)
ead4b1e798 is described below
commit ead4b1e798fe7106287702f7517b1d4453d4315d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Mar 8 17:23:31 2024 +0400
IGNITE-21714 Prevent threads from being hijacked in KV/Record view APIs
(#3378)
---
buildscripts/java-integration-test.gradle | 2 +
buildscripts/java-junit5.gradle | 1 +
.../handler/ClientInboundMessageHandler.java | 13 +-
.../internal/lang/IgniteExceptionMapperUtil.java | 6 +
.../ignite/internal/thread/PublicApiThreading.java | 67 ++++
.../ignite/internal/index/IndexManagerTest.java | 3 +-
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 7 +-
.../internal/test/WatchListenerInhibitor.java | 30 ++
modules/table/build.gradle | 1 -
.../rebalance/ItRebalanceDistributedTest.java | 4 +-
.../threading/ItKvRecordApiThreadingTest.java | 382 +++++++++++++++++++++
.../ignite/internal/table/AbstractTableView.java | 94 ++++-
.../internal/table/KeyValueBinaryViewImpl.java | 81 +++--
.../ignite/internal/table/KeyValueViewImpl.java | 94 ++---
.../internal/table/RecordBinaryViewImpl.java | 86 +++--
.../ignite/internal/table/RecordViewImpl.java | 84 +++--
.../apache/ignite/internal/table/TableImpl.java | 20 +-
.../internal/table/distributed/TableManager.java | 18 +-
.../table/KeyValueBinaryViewOperationsTest.java | 5 +-
.../internal/table/KeyValueViewOperationsTest.java | 4 +-
.../table/RecordBinaryViewOperationsTest.java | 8 +-
.../internal/table/RecordViewOperationsTest.java | 6 +-
.../table/distributed/TableManagerTest.java | 4 +-
24 files changed, 832 insertions(+), 192 deletions(-)
diff --git a/buildscripts/java-integration-test.gradle
b/buildscripts/java-integration-test.gradle
index a5793d66fe..7e3167c7a8 100644
--- a/buildscripts/java-integration-test.gradle
+++ b/buildscripts/java-integration-test.gradle
@@ -30,6 +30,8 @@ testing {
implementation libs.junit5.api
implementation libs.junit5.impl
implementation libs.junit5.params
+ implementation libs.junit.pioneer
+
implementation libs.mockito.core
implementation libs.mockito.junit
implementation libs.hamcrest.core
diff --git a/buildscripts/java-junit5.gradle b/buildscripts/java-junit5.gradle
index 5ca6ba1e29..2084cb536b 100644
--- a/buildscripts/java-junit5.gradle
+++ b/buildscripts/java-junit5.gradle
@@ -34,6 +34,7 @@ dependencies {
testImplementation libs.junit5.impl
testImplementation libs.junit5.api
testImplementation libs.junit5.params
+ testImplementation libs.junit.pioneer
testImplementation libs.jansi.core
testImplementation libs.log4j.api
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 8979942cf5..a04e1f0c41 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -128,6 +128,7 @@ import
org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
+import org.apache.ignite.internal.thread.PublicApiThreading;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
@@ -566,7 +567,17 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
// Observable timestamp should be calculated after the operation
is processed; reserve space, write later.
int observableTimestampIdx = out.reserveLong();
- CompletableFuture fut = processOperation(in, out, opCode,
requestId);
+ CompletableFuture fut;
+
+ // Enclosing in 'internal call' to save resubmission to the async
continuation thread pool on return. This will only
+ // work if the corresponding call (like an async KeyValueView
method) is invoked in this same thread, but in most cases this
+ // will be true.
+ PublicApiThreading.startInternalCall();
+ try {
+ fut = processOperation(in, out, opCode, requestId);
+ } finally {
+ PublicApiThreading.endInternalCall();
+ }
if (fut == null) {
// Operation completed synchronously.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
index fbae12e574..b77c94023b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.lang;
import static java.util.Collections.unmodifiableMap;
+import static
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -126,6 +127,11 @@ public class IgniteExceptionMapperUtil {
* @return New CompletableFuture.
*/
public static <T> CompletableFuture<T>
convertToPublicFuture(CompletableFuture<T> origin) {
+ if (isCompletedSuccessfully(origin)) {
+ // No need to translate exceptions.
+ return origin;
+ }
+
return origin
.handle((res, err) -> {
if (err != null) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
new file mode 100644
index 0000000000..fa82b26717
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import java.util.function.Supplier;
+
+/**
+ * Logic related to the threading concern of public APIs: it allows to
raise/clear/check a thread-local flag which gives us ability to
+ * see whether a public API call actually comes from an Ignite internal code
and not from an actual user.
+ */
+public class PublicApiThreading {
+ private static final ThreadLocal<Boolean> INTERNAL_CALL = new
ThreadLocal<>();
+
+ /**
+ * Raises the 'internal call' state; the state is stored in the current
thread.
+ */
+ public static void startInternalCall() {
+ INTERNAL_CALL.set(true);
+ }
+
+ /**
+ * Clears the 'internal call' state; the state is stored in the current
thread.
+ */
+ public static void endInternalCall() {
+ INTERNAL_CALL.remove();
+ }
+
+ /**
+ * Executes the call while the 'internal call' state is raised; so if the
call invokes {@link #inInternalCall()},
+ * if will get {@code true}.
+ *
+ * @param call Call to execute as internal.
+ * @return Call result.
+ */
+ public static <T> T doInternalCall(Supplier<T> call) {
+ startInternalCall();
+
+ try {
+ return call.get();
+ } finally {
+ endInternalCall();
+ }
+ }
+
+ /**
+ * Returns {@code} true if the 'internal call' status is currently raised
in the current thread.
+ */
+ public static boolean inInternalCall() {
+ Boolean value = INTERNAL_CALL.get();
+ return value != null && value;
+ }
+}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index b6ee9dee73..7c1f7b1efa 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -199,7 +199,8 @@ public class IndexManagerTest extends
BaseIgniteAbstractTest {
new ConstantSchemaVersions(1),
marshallers,
mock(IgniteSql.class),
- table.primaryKeyIndexId()
+ table.primaryKeyIndexId(),
+ ForkJoinPool.commonPool()
));
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 1f9d7a988e..e468284ba0 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -58,6 +58,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -571,7 +572,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
sqlRef::get,
resourcesRegistry,
rebalanceScheduler,
- lowWatermark
+ lowWatermark,
+ ForkJoinPool.commonPool()
);
var indexManager = new IndexManager(
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 59514b0468..11f2ac141e 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -39,8 +39,10 @@ import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.BiPredicate;
@@ -376,6 +378,8 @@ public class IgniteImpl implements Ignite {
/** Remote triggered resources registry. */
private final RemotelyTriggeredResourceRegistry resourcesRegistry;
+ private final Executor asyncContinuationExecutor =
ForkJoinPool.commonPool();
+
/**
* The Constructor.
*
@@ -729,7 +733,8 @@ public class IgniteImpl implements Ignite {
this::sql,
resourcesRegistry,
rebalanceScheduler,
- lowWatermark
+ lowWatermark,
+ asyncContinuationExecutor
);
indexManager = new IndexManager(
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
index b28365b3a5..a93ed1068a 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldV
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -111,4 +112,33 @@ public class WatchListenerInhibitor {
public void stopInhibit() {
inhibitFuture.complete(null);
}
+
+ /**
+ * Executes an action enclosed in watch inhibition: that is, before
execution inhibition gets started, and after the execution
+ * it gets stopped.
+ *
+ * @param action Action to execute.
+ * @return Action result.
+ */
+ public <T> T withInhibition(Supplier<? extends T> action) {
+ startInhibit();
+
+ try {
+ return action.get();
+ } finally {
+ stopInhibit();
+ }
+ }
+
+ /**
+ * Executes an action enclosed in watch inhibition: that is, before
execution inhibition gets started, and after the execution
+ * it gets stopped.
+ *
+ * @param ignite Node on which to inhibit watch processing.
+ * @param action Action to execute.
+ * @return Action result.
+ */
+ public static <T> T withInhibition(Ignite ignite, Supplier<? extends T>
action) {
+ return metastorageEventsInhibitor(ignite).withInhibition(action);
+ }
}
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 009354c26b..bd52b7a05b 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -80,7 +80,6 @@ dependencies {
testImplementation libs.mockito.junit
testImplementation libs.hamcrest.core
testImplementation libs.hamcrest.optional
- testImplementation libs.junit.pioneer
testImplementation libs.jmh.core
testImplementation libs.javax.annotations
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 33354bd9f1..0a2b0e47b0 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -72,6 +72,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -1215,7 +1216,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
() -> mock(IgniteSql.class),
resourcesRegistry,
rebalanceScheduler,
- lowWatermark
+ lowWatermark,
+ ForkJoinPool.commonPool()
) {
@Override
protected TxStateTableStorage createTxStateTableStorage(
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
new file mode 100644
index 0000000000..a43c8b9ea6
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.threading;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.streamer.SimplePublisher;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
+
+@WithSystemProperty(key = IgniteSystemProperties.THREAD_ASSERTIONS_ENABLED,
value = "false")
+@SuppressWarnings("resource")
+class ItKvRecordApiThreadingTest extends ClusterPerClassIntegrationTest {
+ private static final String TABLE_NAME = "test";
+ private static final int KEY = 1;
+
+ private static final Record KEY_RECORD = new Record(1, "");
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @BeforeAll
+ void createTable() {
+ sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val
VARCHAR)");
+ }
+
+ @BeforeEach
+ void upsertRecord() {
+ plainKeyValueView().put(null, KEY, "one");
+ }
+
+ private static KeyValueView<Integer, String> plainKeyValueView() {
+ return testTable().keyValueView(Integer.class, String.class);
+ }
+
+ private static KeyValueView<Tuple, Tuple> binaryKeyValueView() {
+ return testTable().keyValueView();
+ }
+
+ private static Table testTable() {
+ return CLUSTER.aliveNode().tables().table(TABLE_NAME);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @CartesianTest
+ void keyValueViewFuturesCompleteInContinuationsPool(
+ @Enum KeyValueViewAsyncOperation operation,
+ @Enum KeyValueViewKind kind
+ ) {
+ assumeTrue(kind.supportsGetNullable() || !operation.isGetNullable());
+
+ KeyValueView tableView = kind.view();
+
+ @SuppressWarnings("unchecked") CompletableFuture<Thread>
completerFuture = forcingSwitchFromUserThread(
+ () -> operation.executeOn(tableView, kind.context())
+ .thenApply(unused -> Thread.currentThread())
+ );
+
+ assertThat(completerFuture, willBe(commonPoolThread()));
+ }
+
+ private static Matcher<Object> commonPoolThread() {
+ return both(hasProperty("name",
startsWith("ForkJoinPool.commonPool-worker-")))
+ .and(not(instanceOf(IgniteThread.class)));
+ }
+
+ private static <T> T forcingSwitchFromUserThread(Supplier<? extends T>
action) {
+ return WatchListenerInhibitor.withInhibition(CLUSTER.aliveNode(), ()
-> {
+ waitForSchemaSyncRequiringWait();
+
+ return action.get();
+ });
+ }
+
+ private static void waitForSchemaSyncRequiringWait() {
+ try {
+ Thread.sleep(TestIgnitionManager.DEFAULT_DELAY_DURATION_MS + 1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static KeyValueContext<Integer, String> plainKeyValueContext() {
+ return new KeyValueContext<>(KEY, "one", "two");
+ }
+
+ private static KeyValueContext<Tuple, Tuple> binaryKeyValueContext() {
+ return new KeyValueContext<>(Tuple.create().set("id", KEY),
Tuple.create().set("val", "one"), Tuple.create().set("val", "two"));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @CartesianTest
+ void
keyValueViewFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
+ @Enum KeyValueViewAsyncOperation operation,
+ @Enum KeyValueViewKind kind
+ ) {
+ assumeTrue(kind.supportsGetNullable() || !operation.isGetNullable());
+
+ KeyValueView tableView = kind.view();
+
+ @SuppressWarnings("unchecked") CompletableFuture<Thread>
completerFuture = forcingSwitchFromUserThread(
+ () -> PublicApiThreading.doInternalCall(
+ () -> operation.executeOn(tableView, kind.context())
+ .thenApply(unused -> Thread.currentThread())
+ )
+ );
+
+ assertThat(completerFuture, willBe(anIgniteThread()));
+ }
+
+ private static Matcher<Object> anIgniteThread() {
+ return instanceOf(IgniteThread.class);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @CartesianTest
+ void recordViewFuturesCompleteInContinuationsPool(
+ @Enum RecordViewAsyncOperation operation,
+ @Enum RecordViewKind kind
+ ) {
+ RecordView tableView = kind.view();
+
+ @SuppressWarnings("unchecked") CompletableFuture<Thread>
completerFuture = forcingSwitchFromUserThread(
+ () -> operation.executeOn(tableView, kind.context())
+ .thenApply(unused -> Thread.currentThread())
+ );
+
+ assertThat(completerFuture, willBe(commonPoolThread()));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @CartesianTest
+ void
recordViewFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
+ @Enum RecordViewAsyncOperation operation,
+ @Enum RecordViewKind kind
+ ) {
+ RecordView tableView = kind.view();
+
+ @SuppressWarnings("unchecked") CompletableFuture<Thread>
completerFuture = forcingSwitchFromUserThread(
+ () -> PublicApiThreading.doInternalCall(
+ () -> operation.executeOn(tableView, kind.context())
+ .thenApply(unused -> Thread.currentThread())
+ )
+ );
+
+ assertThat(completerFuture, willBe(anIgniteThread()));
+ }
+
+ private static RecordView<Record> plainRecordView() {
+ return testTable().recordView(Record.class);
+ }
+
+ private static RecordView<Tuple> binaryRecordView() {
+ return testTable().recordView();
+ }
+
+ private static RecordContext<Record> plainRecordContext() {
+ return new RecordContext<>(KEY_RECORD, new Record(KEY, "one"), new
Record(KEY, "two"));
+ }
+
+ private static RecordContext<Tuple> binaryRecordContext() {
+ return new RecordContext<>(KEY_RECORD.toKeyTuple(), new Record(KEY,
"one").toFullTuple(), new Record(KEY, "two").toFullTuple());
+ }
+
+ private enum KeyValueViewAsyncOperation {
+ GET_ASYNC((view, context) -> view.getAsync(null, context.key)),
+ GET_NULLABLE_ASYNC((view, context) -> view.getNullableAsync(null,
context.key)),
+ GET_OR_DEFAULT_ASYNC((view, context) -> view.getOrDefaultAsync(null,
context.key, context.anotherValue)),
+ GET_ALL_ASYNC((view, context) -> view.getAllAsync(null,
List.of(context.key))),
+ CONTAINS_ASYNC((view, context) -> view.containsAsync(null,
context.key)),
+ PUT_ASYNC((view, context) -> view.putAsync(null, context.key,
context.usualValue)),
+ PUT_ALL_ASYNC((view, context) -> view.putAllAsync(null,
Map.of(context.key, context.usualValue))),
+ GET_AND_PUT_ASYNC((view, context) -> view.getAndPutAsync(null,
context.key, context.usualValue)),
+ GET_NULLABLE_AND_PUT_ASYNC((view, context) ->
view.getNullableAndPutAsync(null, context.key, context.usualValue)),
+ PUT_IF_ABSENT_ASYNC((view, context) -> view.putIfAbsentAsync(null,
context.key, context.usualValue)),
+ REMOVE_ASYNC((view, context) -> view.removeAsync(null, context.key)),
+ REMOVE_EXACT_ASYNC((view, context) -> view.removeAsync(null,
context.key, context.usualValue)),
+ REMOVE_ALL_ASYNC((view, context) -> view.removeAllAsync(null,
List.of(context.key))),
+ GET_AND_REMOVE_ASYNC((view, context) -> view.getAndRemoveAsync(null,
context.key)),
+ GET_NULLABLE_AND_REMOVE_ASYNC((view, context) ->
view.getNullableAndRemoveAsync(null, context.key)),
+ REPLACE_ASYNC((view, context) -> view.replaceAsync(null, context.key,
context.usualValue)),
+ REPLACE_EXACT_ASYNC((view, context) -> view.replaceAsync(null,
context.key, context.usualValue, context.anotherValue)),
+ GET_AND_REPLACE_ASYNC((view, context) -> view.getAndReplaceAsync(null,
context.key, context.usualValue)),
+ GET_NULLABLE_AND_REPLACE_ASYNC((view, context) ->
view.getNullableAndReplaceAsync(null, context.key, context.usualValue)),
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ STREAM_DATA((view, context) -> {
+ CompletableFuture<?> future;
+ try (var publisher = new SimplePublisher()) {
+ future = view.streamData(publisher, null);
+ publisher.submit(Map.entry(context.key, context.usualValue));
+ }
+ return future;
+ }),
+ QUERY_ASYNC((view, context) -> view.queryAsync(null, null)),
+ QUERY_ASYNC_WITH_INDEX_NAME((view, context) -> view.queryAsync(null,
null, null)),
+ QUERY_ASYNC_WITH_INDEX_NAME_AND_OPTS((view, context) ->
view.queryAsync(null, null, null, null));
+
+ private final BiFunction<KeyValueView<Object, Object>,
KeyValueContext<Object, Object>, CompletableFuture<?>> action;
+
+ KeyValueViewAsyncOperation(BiFunction<KeyValueView<Object, Object>,
KeyValueContext<Object, Object>, CompletableFuture<?>> action) {
+ this.action = action;
+ }
+
+ <K, V> CompletableFuture<?> executeOn(KeyValueView<K, V> tableView,
KeyValueContext<K, V> context) {
+ return action.apply((KeyValueView<Object, Object>) tableView,
(KeyValueContext<Object, Object>) context);
+ }
+
+ boolean isGetNullable() {
+ switch (this) {
+ case GET_NULLABLE_ASYNC:
+ case GET_NULLABLE_AND_PUT_ASYNC:
+ case GET_NULLABLE_AND_REMOVE_ASYNC:
+ case GET_NULLABLE_AND_REPLACE_ASYNC:
+ return true;
+ default:
+ return false;
+ }
+ }
+ }
+
+ private static class KeyValueContext<K, V> {
+ final K key;
+ final V usualValue;
+ final V anotherValue;
+
+ private KeyValueContext(K key, V usualValue, V anotherValue) {
+ this.key = key;
+ this.usualValue = usualValue;
+ this.anotherValue = anotherValue;
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private enum KeyValueViewKind {
+ PLAIN, BINARY;
+
+ KeyValueView view() {
+ return this == PLAIN ? plainKeyValueView() : binaryKeyValueView();
+ }
+
+ KeyValueContext context() {
+ return this == PLAIN ? plainKeyValueContext() :
binaryKeyValueContext();
+ }
+
+ boolean supportsGetNullable() {
+ return this == PLAIN;
+ }
+ }
+
+ @SuppressWarnings({"FieldCanBeLocal", "unused"})
+ private static class Record {
+ private int id;
+ private String val;
+
+ private Record() {
+ }
+
+ private Record(int id, String val) {
+ this.id = id;
+ this.val = val;
+ }
+
+ Tuple toKeyTuple() {
+ return Tuple.create().set("id", id);
+ }
+
+ Tuple toFullTuple() {
+ return Tuple.create().set("id", id).set("val", val);
+ }
+ }
+
+ private enum RecordViewAsyncOperation {
+ GET_ASYNC((view, context) -> view.getAsync(null, context.keyRecord)),
+ GET_ALL_ASYNC((view, context) -> view.getAllAsync(null,
List.of(context.keyRecord))),
+ CONTAINS_ASYNC((view, context) -> view.containsAsync(null,
context.keyRecord)),
+ UPSERT_ASYNC((view, context) -> view.upsertAsync(null,
context.fullRecord)),
+ UPSERT_ALL_ASYNC((view, context) -> view.upsertAllAsync(null,
List.of(context.fullRecord))),
+ GET_AND_UPSERT_ASYNC((view, context) -> view.getAndUpsertAsync(null,
context.fullRecord)),
+ INSERT_ASYNC((view, context) -> view.insertAsync(null,
context.fullRecord)),
+ INSERT_ALL_ASYNC((view, context) -> view.insertAllAsync(null,
List.of(context.fullRecord))),
+ REPLACE_ASYNC((view, context) -> view.replaceAsync(null,
context.fullRecord)),
+ REPLACE_EXACT_ASYNC((view, context) -> view.replaceAsync(null,
context.fullRecord, context.anotherFullRecord)),
+ GET_AND_REPLACE_ASYNC((view, context) -> view.getAndReplaceAsync(null,
context.keyRecord)),
+ DELETE_ASYNC((view, context) -> view.deleteAsync(null,
context.keyRecord)),
+ DELETE_EXACT_ASYNC((view, context) -> view.deleteExactAsync(null,
context.fullRecord)),
+ GET_AND_DELETE_ASYNC((view, context) -> view.getAndDeleteAsync(null,
context.keyRecord)),
+ DELETE_ALL_ASYNC((view, context) -> view.deleteAllAsync(null,
List.of(context.keyRecord))),
+ DELETE_ALL_EXACT_ASYNC((view, context) ->
view.deleteAllExactAsync(null, List.of(context.keyRecord))),
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ STREAM_DATA((view, context) -> {
+ CompletableFuture<?> future;
+ try (var publisher = new SimplePublisher()) {
+ future = view.streamData(publisher, null);
+ publisher.submit(context.fullRecord);
+ }
+ return future;
+ }),
+ QUERY_ASYNC((view, context) -> view.queryAsync(null, null)),
+ QUERY_ASYNC_WITH_INDEX_NAME((view, context) -> view.queryAsync(null,
null, null)),
+ QUERY_ASYNC_WITH_INDEX_NAME_AND_OPTS((view, context) ->
view.queryAsync(null, null, null, null));
+
+ private final BiFunction<RecordView<Object>, RecordContext<Object>,
CompletableFuture<?>> action;
+
+ RecordViewAsyncOperation(BiFunction<RecordView<Object>,
RecordContext<Object>, CompletableFuture<?>> action) {
+ this.action = action;
+ }
+
+ <R> CompletableFuture<?> executeOn(RecordView<R> tableView,
RecordContext<R> context) {
+ return action.apply((RecordView<Object>) tableView,
(RecordContext<Object>) context);
+ }
+ }
+
+ private static class RecordContext<R> {
+ final R keyRecord;
+ final R fullRecord;
+ final R anotherFullRecord;
+
+ private RecordContext(R keyRecord, R fullRecord, R anotherFullRecord) {
+ this.keyRecord = keyRecord;
+ this.fullRecord = fullRecord;
+ this.anotherFullRecord = anotherFullRecord;
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private enum RecordViewKind {
+ PLAIN, BINARY;
+
+ RecordView view() {
+ return this == PLAIN ? plainRecordView() : binaryRecordView();
+ }
+
+ RecordContext context() {
+ return this == PLAIN ? plainRecordContext() :
binaryRecordContext();
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 7cf46edcdb..3ba9fa63c4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -29,7 +29,10 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
import org.apache.ignite.internal.marshaller.MarshallersProvider;
import org.apache.ignite.internal.schema.Column;
@@ -38,10 +41,11 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.table.criteria.CursorAdapter;
import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
import org.apache.ignite.internal.table.criteria.SqlSerializer;
+import org.apache.ignite.internal.table.criteria.SqlSerializer.Builder;
import
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
+import org.apache.ignite.internal.thread.PublicApiThreading;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.AsyncCursor;
import org.apache.ignite.lang.Cursor;
import org.apache.ignite.sql.IgniteSql;
@@ -73,6 +77,8 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
/** Marshallers provider. */
protected final MarshallersProvider marshallers;
+ private final Executor asyncContinuationExecutor;
+
/**
* Constructor.
*
@@ -81,15 +87,24 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
* @param schemaReg Schema registry.
* @param sql Ignite SQL facade.
* @param marshallers Marshallers provider.
+ * @param asyncContinuationExecutor Executor to which execution will be
resubmitted when leaving asynchronous public API endpoints
+ * (to prevent the user from stealing Ignite threads).
*/
- AbstractTableView(InternalTable tbl, SchemaVersions schemaVersions,
SchemaRegistry schemaReg, IgniteSql sql,
- MarshallersProvider marshallers) {
+ AbstractTableView(
+ InternalTable tbl,
+ SchemaVersions schemaVersions,
+ SchemaRegistry schemaReg,
+ IgniteSql sql,
+ MarshallersProvider marshallers,
+ Executor asyncContinuationExecutor
+ ) {
this.tbl = tbl;
this.schemaVersions = schemaVersions;
this.sql = sql;
+ this.marshallers = marshallers;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
this.rowConverter = new TableViewRowConverter(schemaReg);
- this.marshallers = marshallers;
}
/**
@@ -99,19 +114,40 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
* @param <T> Future result type.
* @return Future result.
*/
- protected final <T> T sync(CompletableFuture<T> fut) {
+ protected final <T> T sync(Supplier<CompletableFuture<T>> fut) {
+ // Enclose in 'internal call' to prevent resubmission to the async
continuation pool on future completion
+ // as such a resubmission is useless in the sync case and will just
increase latency.
+ PublicApiThreading.startInternalCall();
+
try {
- return fut.get();
+ return fut.get().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupt flag.
throw
sneakyThrow(IgniteExceptionMapperUtil.mapToPublicException(e));
} catch (ExecutionException e) {
- Throwable cause = ExceptionUtils.unwrapCause(e);
+ Throwable cause = unwrapCause(e);
throw sneakyThrow(cause);
+ } finally {
+ PublicApiThreading.endInternalCall();
}
}
+ /**
+ * Combines the effect of {@link #withSchemaSync(Transaction, KvAction)},
{@link #preventThreadHijack(CompletableFuture)} and
+ * {@link
IgniteExceptionMapperUtil#convertToPublicFuture(CompletableFuture)}.
+ *
+ * @param <T> Type of the data the action returns.
+ * @param tx Transaction or {@code null}.
+ * @param action Action to execute.
+ * @return Future of whatever the action returns.
+ */
+ protected final <T> CompletableFuture<T> doOperation(@Nullable Transaction
tx, KvAction<T> action) {
+ CompletableFuture<T> future = preventThreadHijack(withSchemaSync(tx,
action));
+
+ return convertToPublicFuture(future);
+ }
+
/**
* Executes the provided KV action in the given transaction, maintaining
Schema Synchronization semantics: that is, before executing
* the action, a check is made to make sure that the current node has
schemas complete wrt timestamp corresponding to the operation
@@ -139,7 +175,7 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
? schemaVersions.schemaVersionAtNow(tbl.tableId())
: schemaVersions.schemaVersionAt(((InternalTransaction)
tx).startTimestamp(), tbl.tableId());
- CompletableFuture<T> future = schemaVersionFuture
+ return schemaVersionFuture
.thenCompose(schemaVersion -> action.act(schemaVersion)
.handle((res, ex) -> {
if
(isOrCausedBy(InternalSchemaVersionMismatchException.class, ex)) {
@@ -155,8 +191,29 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
}
}))
.thenCompose(identity());
+ }
- return convertToPublicFuture(future);
+ /**
+ * Prevents Ignite internal threads from being hijacked by the user code.
If that happened, the user code could have blocked
+ * Ignite threads deteriorating progress.
+ *
+ * <p>This is done by completing the future in the async continuation
thread pool if it would have been completed in an Ignite thread.
+ * This does not happen for synchronous operations as it's impossible to
hijack a thread using such operations.
+ *
+ * <p>The switch to the async continuation pool is also skipped when it's
known that the call is made by other Ignite component
+ * and not by the user.
+ *
+ * @param originalFuture Operation future.
+ * @return Future that will be completed in the async continuation thread
pool ({@link ForkJoinPool#commonPool()} by default).
+ */
+ protected final <T> CompletableFuture<T>
preventThreadHijack(CompletableFuture<T> originalFuture) {
+ if (originalFuture.isDone() || PublicApiThreading.inInternalCall()) {
+ return originalFuture;
+ }
+
+ // The future is not complete yet, so it will be completed on an
Ignite thread, so we need to complete the user-facing future
+ // in the continuation pool.
+ return originalFuture.whenCompleteAsync((res, ex) -> {},
asyncContinuationExecutor);
}
/**
@@ -189,7 +246,7 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
/** {@inheritDoc} */
@Override
public Cursor<R> query(@Nullable Transaction tx, @Nullable Criteria
criteria, @Nullable String indexName, CriteriaQueryOptions opts) {
- return new CursorAdapter<>(sync(queryAsync(tx, criteria, indexName,
opts)));
+ return new CursorAdapter<>(sync(() -> queryAsync(tx, criteria,
indexName, opts)));
}
/** {@inheritDoc} */
@@ -202,10 +259,10 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
) {
CriteriaQueryOptions opts0 = opts == null ?
CriteriaQueryOptions.DEFAULT : opts;
- return withSchemaSync(tx, (schemaVersion) -> {
+ CompletableFuture<AsyncCursor<R>> future = doOperation(tx,
(schemaVersion) -> {
SchemaDescriptor schema =
rowConverter.registry().schema(schemaVersion);
- SqlSerializer ser = new SqlSerializer.Builder()
+ SqlSerializer ser = new Builder()
.tableName(tbl.name())
.columns(schema.columnNames())
.indexName(indexName)
@@ -228,19 +285,20 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
session.closeAsync();
}
});
- })
- .exceptionally(th -> {
- throw new
CompletionException(mapToPublicCriteriaException(unwrapCause(th)));
- });
+ });
+
+ return future.exceptionally(th -> {
+ throw new
CompletionException(mapToPublicCriteriaException(unwrapCause(th)));
+ });
}
/**
* Action representing some KV operation. When executed, the action is
supplied with schema version corresponding
- * to the operation timestamp (see {@link #withSchemaSync(Transaction,
KvAction)} for details).
+ * to the operation timestamp (see {@link #doOperation(Transaction,
KvAction)} for details).
*
* @param <R> Type of the result.
- * @see #withSchemaSync(Transaction, KvAction)
+ * @see #doOperation(Transaction, KvAction)
*/
@FunctionalInterface
protected interface KvAction<R> {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index ff6a58ccaa..65ede03a30 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
@@ -26,6 +28,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -69,17 +72,20 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
* @param tbl Table storage.
* @param schemaReg Schema registry.
* @param schemaVersions Schema versions access.
- * @param marshallers Marshallers provider.
* @param sql Ignite SQL facade.
+ * @param marshallers Marshallers provider.
+ * @param asyncContinuationExecutor Executor to which execution will be
resubmitted when leaving asynchronous public API
+ * endpoints (so as to prevent the user from stealing Ignite
threads).
*/
public KeyValueBinaryViewImpl(
InternalTable tbl,
SchemaRegistry schemaReg,
SchemaVersions schemaVersions,
+ IgniteSql sql,
MarshallersProvider marshallers,
- IgniteSql sql
+ Executor asyncContinuationExecutor
) {
- super(tbl, schemaVersions, schemaReg, sql, marshallers);
+ super(tbl, schemaVersions, schemaReg, sql, marshallers,
asyncContinuationExecutor);
marshallerCache = new TupleMarshallerCache(schemaReg);
}
@@ -87,7 +93,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public Tuple get(@Nullable Transaction tx, Tuple key) {
- return sync(getAsync(tx, key));
+ return sync(() -> getAsync(tx, key));
}
/** {@inheritDoc} */
@@ -95,7 +101,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
public CompletableFuture<Tuple> getAsync(@Nullable Transaction tx, Tuple
key) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row keyRow = marshal(key, null, schemaVersion);
return tbl.get(keyRow, (InternalTransaction) tx).thenApply(row ->
unmarshalValue(row, schemaVersion));
@@ -125,7 +131,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public Tuple getOrDefault(@Nullable Transaction tx, Tuple key, Tuple
defaultValue) {
- return sync(getOrDefaultAsync(tx, key, defaultValue));
+ return sync(() -> getOrDefaultAsync(tx, key, defaultValue));
}
/** {@inheritDoc} */
@@ -133,7 +139,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
public CompletableFuture<Tuple> getOrDefaultAsync(@Nullable Transaction
tx, Tuple key, Tuple defaultValue) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshal(key, null, schemaVersion);
return tbl.get(keyRow, (InternalTransaction) tx)
@@ -144,7 +150,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public Map<Tuple, Tuple> getAll(@Nullable Transaction tx,
Collection<Tuple> keys) {
- return sync(getAllAsync(tx, keys));
+ return sync(() -> getAllAsync(tx, keys));
}
/** {@inheritDoc} */
@@ -152,7 +158,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
public CompletableFuture<Map<Tuple, Tuple>> getAllAsync(@Nullable
Transaction tx, Collection<Tuple> keys) {
checkKeysForNulls(keys);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
List<BinaryRowEx> keyRows = marshalKeys(keys, schemaVersion);
return tbl.getAll(keyRows, (InternalTransaction)
tx).thenApply(rows -> unmarshalValues(rows, schemaVersion));
@@ -182,7 +188,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public void put(@Nullable Transaction tx, Tuple key, Tuple val) {
- sync(putAsync(tx, key, val));
+ sync(() -> putAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -191,7 +197,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(key, val, schemaVersion);
return tbl.upsert(row, (InternalTransaction) tx);
@@ -201,7 +207,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public void putAll(@Nullable Transaction tx, Map<Tuple, Tuple> pairs) {
- sync(putAllAsync(tx, pairs));
+ sync(() -> putAllAsync(tx, pairs));
}
/** {@inheritDoc} */
@@ -213,7 +219,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
Objects.requireNonNull(entry.getValue());
}
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.upsertAll(marshalPairs(pairs.entrySet(), schemaVersion,
null), (InternalTransaction) tx);
});
}
@@ -221,7 +227,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public Tuple getAndPut(@Nullable Transaction tx, Tuple key, Tuple val) {
- return sync(getAndPutAsync(tx, key, val));
+ return sync(() -> getAndPutAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -230,7 +236,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(key, val, schemaVersion);
return tbl.getAndUpsert(row, (InternalTransaction)
tx).thenApply(resultRow -> unmarshalValue(resultRow, schemaVersion));
@@ -261,7 +267,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public boolean putIfAbsent(@Nullable Transaction tx, Tuple key, Tuple val)
{
- return sync(putIfAbsentAsync(tx, key, val));
+ return sync(() -> putIfAbsentAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -270,7 +276,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(key, val, schemaVersion);
return tbl.insert(row, (InternalTransaction) tx);
@@ -280,13 +286,13 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public boolean remove(@Nullable Transaction tx, Tuple key) {
- return sync(removeAsync(tx, key));
+ return sync(() -> removeAsync(tx, key));
}
/** {@inheritDoc} */
@Override
public boolean remove(@Nullable Transaction tx, Tuple key, Tuple val) {
- return sync(removeAsync(tx, key, val));
+ return sync(() -> removeAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -294,7 +300,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx,
Tuple key) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(key, null, schemaVersion);
return tbl.delete(row, (InternalTransaction) tx);
@@ -307,7 +313,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(key, val, schemaVersion);
return tbl.deleteExact(row, (InternalTransaction) tx);
@@ -317,7 +323,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public Collection<Tuple> removeAll(@Nullable Transaction tx,
Collection<Tuple> keys) {
- return sync(removeAllAsync(tx, keys));
+ return sync(() -> removeAllAsync(tx, keys));
}
/** {@inheritDoc} */
@@ -325,7 +331,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
public CompletableFuture<Collection<Tuple>> removeAllAsync(@Nullable
Transaction tx, Collection<Tuple> keys) {
checkKeysForNulls(keys);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
List<BinaryRowEx> keyRows = marshalKeys(keys, schemaVersion);
return tbl.deleteAll(keyRows, (InternalTransaction)
tx).thenApply(rows -> unmarshalKeys(rows, schemaVersion));
@@ -337,7 +343,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
public Tuple getAndRemove(@Nullable Transaction tx, Tuple key) {
Objects.requireNonNull(key);
- return sync(getAndRemoveAsync(tx, key));
+ return sync(() -> getAndRemoveAsync(tx, key));
}
/** {@inheritDoc} */
@@ -345,7 +351,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
public CompletableFuture<Tuple> getAndRemoveAsync(@Nullable Transaction
tx, Tuple key) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.getAndDelete(marshal(key, null, schemaVersion),
(InternalTransaction) tx)
.thenApply(row -> unmarshalValue(row, schemaVersion));
});
@@ -374,13 +380,13 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public boolean replace(@Nullable Transaction tx, Tuple key, Tuple val) {
- return sync(replaceAsync(tx, key, val));
+ return sync(() -> replaceAsync(tx, key, val));
}
/** {@inheritDoc} */
@Override
public boolean replace(@Nullable Transaction tx, Tuple key, Tuple oldVal,
Tuple newVal) {
- return sync(replaceAsync(tx, key, oldVal, newVal));
+ return sync(() -> replaceAsync(tx, key, oldVal, newVal));
}
/** {@inheritDoc} */
@@ -389,7 +395,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(key, val, schemaVersion);
return tbl.replace(row, (InternalTransaction) tx);
@@ -408,7 +414,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
Objects.requireNonNull(oldVal);
Objects.requireNonNull(newVal);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row oldRow = marshal(key, oldVal, schemaVersion);
Row newRow = marshal(key, newVal, schemaVersion);
@@ -419,7 +425,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
/** {@inheritDoc} */
@Override
public Tuple getAndReplace(@Nullable Transaction tx, Tuple key, Tuple val)
{
- return sync(getAndReplaceAsync(tx, key, val));
+ return sync(() -> getAndReplaceAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -428,7 +434,7 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.getAndReplace(marshal(key, val, schemaVersion),
(InternalTransaction) tx)
.thenApply(row -> unmarshalValue(row, schemaVersion));
});
@@ -450,8 +456,11 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
* @throws UnsupportedOperationException unconditionally.
*/
@Override
- public CompletableFuture<NullableValue<Tuple>>
getNullableAndReplaceAsync(@Nullable Transaction tx, Tuple key,
- Tuple val) {
+ public CompletableFuture<NullableValue<Tuple>> getNullableAndReplaceAsync(
+ @Nullable Transaction tx,
+ Tuple key,
+ Tuple val
+ ) {
throw new UnsupportedOperationException("Binary view doesn't allow
null tuples.");
}
@@ -558,9 +567,11 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView<Entry<Tuple, Tuple
StreamerBatchSender<Entry<Tuple, Tuple>, Integer> batchSender =
(partitionId, items, deleted) ->
withSchemaSync(
null,
- schemaVersion ->
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted,
partitionId));
+ schemaVersion ->
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted,
partitionId)
+ );
- return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
+ CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner);
+ return convertToPublicFuture(preventThreadHijack(future));
}
private List<BinaryRowEx> marshalPairs(Collection<Entry<Tuple, Tuple>>
pairs, int schemaVersion, @Nullable BitSet deleted) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 8cc00ffa70..3c4d74c27c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
@@ -26,6 +28,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -82,8 +85,10 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
* @param tbl Table storage.
* @param schemaRegistry Schema registry.
* @param schemaVersions Schema versions access.
- * @param marshallers Marshallers provider.
* @param sql Ignite SQL facade.
+ * @param marshallers Marshallers provider.
+ * @param asyncContinuationExecutor Executor to which execution will be
resubmitted when leaving asynchronous public API
+ * endpoints (so as to prevent the user from stealing Ignite
threads).
* @param keyMapper Key class mapper.
* @param valueMapper Value class mapper.
*/
@@ -91,12 +96,13 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
InternalTable tbl,
SchemaRegistry schemaRegistry,
SchemaVersions schemaVersions,
- MarshallersProvider marshallers,
IgniteSql sql,
+ MarshallersProvider marshallers,
+ Executor asyncContinuationExecutor,
Mapper<K> keyMapper,
Mapper<V> valueMapper
) {
- super(tbl, schemaVersions, schemaRegistry, sql, marshallers);
+ super(tbl, schemaVersions, schemaRegistry, sql, marshallers,
asyncContinuationExecutor);
this.keyMapper = keyMapper;
this.valueMapper = valueMapper;
@@ -107,7 +113,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public V get(@Nullable Transaction tx, K key) {
- return sync(getAsync(tx, key));
+ return sync(() -> getAsync(tx, key));
}
/** {@inheritDoc} */
@@ -115,7 +121,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<V> getAsync(@Nullable Transaction tx, K key) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshal(key, schemaVersion);
return tbl.get(keyRow, (InternalTransaction)
tx).thenApply(binaryRow -> unmarshalValue(binaryRow, schemaVersion));
@@ -125,7 +131,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public NullableValue<V> getNullable(@Nullable Transaction tx, K key) {
- return sync(getNullableAsync(tx, key));
+ return sync(() -> getNullableAsync(tx, key));
}
/** {@inheritDoc} */
@@ -133,7 +139,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<NullableValue<V>> getNullableAsync(@Nullable
Transaction tx, K key) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshal(key, schemaVersion);
return tbl.get(keyRow, (InternalTransaction) tx)
@@ -144,7 +150,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public V getOrDefault(@Nullable Transaction tx, K key, V defaultValue) {
- return sync(getOrDefaultAsync(tx, key, defaultValue));
+ return sync(() -> getOrDefaultAsync(tx, key, defaultValue));
}
/** {@inheritDoc} */
@@ -152,7 +158,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<V> getOrDefaultAsync(@Nullable Transaction tx, K
key, V defaultValue) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshal(key, schemaVersion);
return tbl.get(keyRow, (InternalTransaction) tx)
@@ -163,7 +169,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public Map<K, V> getAll(@Nullable Transaction tx, Collection<K> keys) {
- return sync(getAllAsync(tx, keys));
+ return sync(() -> getAllAsync(tx, keys));
}
/** {@inheritDoc} */
@@ -171,7 +177,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<Map<K, V>> getAllAsync(@Nullable Transaction tx,
Collection<K> keys) {
checkKeysForNulls(keys);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Collection<BinaryRowEx> rows = marshal(keys, schemaVersion);
return tbl.getAll(rows, (InternalTransaction)
tx).thenApply(resultRows -> unmarshalPairs(resultRows, schemaVersion));
@@ -189,7 +195,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public boolean contains(@Nullable Transaction tx, K key) {
- return sync(containsAsync(tx, key));
+ return sync(() -> containsAsync(tx, key));
}
/** {@inheritDoc} */
@@ -197,7 +203,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx,
K key) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshal(key, schemaVersion);
return tbl.get(keyRow, (InternalTransaction)
tx).thenApply(Objects::nonNull);
@@ -207,7 +213,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public void put(@Nullable Transaction tx, K key, @Nullable V val) {
- sync(putAsync(tx, key, val));
+ sync(() -> putAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -215,7 +221,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<Void> putAsync(@Nullable Transaction tx, K key,
@Nullable V val) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshal(key, val, schemaVersion);
return tbl.upsert(row, (InternalTransaction) tx);
@@ -225,7 +231,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public void putAll(@Nullable Transaction tx, Map<K, V> pairs) {
- sync(putAllAsync(tx, pairs));
+ sync(() -> putAllAsync(tx, pairs));
}
/** {@inheritDoc} */
@@ -236,7 +242,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
Objects.requireNonNull(entry.getKey());
}
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Collection<BinaryRowEx> rows = marshalPairs(pairs.entrySet(),
schemaVersion, null);
return tbl.upsertAll(rows, (InternalTransaction) tx);
@@ -246,7 +252,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public V getAndPut(@Nullable Transaction tx, K key, @Nullable V val) {
- return sync(getAndPutAsync(tx, key, val));
+ return sync(() -> getAndPutAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -255,7 +261,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.getAndUpsert(marshal(key, val, schemaVersion),
(InternalTransaction) tx)
.thenApply(binaryRow -> unmarshalValue(binaryRow,
schemaVersion));
});
@@ -264,7 +270,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public NullableValue<V> getNullableAndPut(@Nullable Transaction tx, K key,
@Nullable V val) {
- return sync(getNullableAndPutAsync(tx, key, val));
+ return sync(() -> getNullableAndPutAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -272,7 +278,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<NullableValue<V>>
getNullableAndPutAsync(@Nullable Transaction tx, K key, @Nullable V val) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshal(key, val, schemaVersion);
return tbl.getAndUpsert(row, (InternalTransaction) tx)
@@ -283,7 +289,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public boolean putIfAbsent(@Nullable Transaction tx, K key, @Nullable V
val) {
- return sync(putIfAbsentAsync(tx, key, val));
+ return sync(() -> putIfAbsentAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -291,7 +297,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<Boolean> putIfAbsentAsync(@Nullable Transaction
tx, K key, @Nullable V val) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshal(key, val, schemaVersion);
return tbl.insert(row, (InternalTransaction) tx);
@@ -301,13 +307,13 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public boolean remove(@Nullable Transaction tx, K key) {
- return sync(removeAsync(tx, key));
+ return sync(() -> removeAsync(tx, key));
}
/** {@inheritDoc} */
@Override
public boolean remove(@Nullable Transaction tx, K key, @Nullable V val) {
- return sync(removeAsync(tx, key, val));
+ return sync(() -> removeAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -315,7 +321,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K
key) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshal(key, schemaVersion);
return tbl.delete(row, (InternalTransaction) tx);
@@ -327,7 +333,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K
key, @Nullable V val) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshal(key, val, schemaVersion);
return tbl.deleteExact(row, (InternalTransaction) tx);
@@ -337,7 +343,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public Collection<K> removeAll(@Nullable Transaction tx, Collection<K>
keys) {
- return sync(removeAllAsync(tx, keys));
+ return sync(() -> removeAllAsync(tx, keys));
}
/** {@inheritDoc} */
@@ -345,7 +351,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<Collection<K>> removeAllAsync(@Nullable
Transaction tx, Collection<K> keys) {
checkKeysForNulls(keys);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Collection<BinaryRowEx> rows = marshal(keys, schemaVersion);
return tbl.deleteAll(rows, (InternalTransaction)
tx).thenApply(resultRows -> unmarshalKeys(resultRows, schemaVersion));
@@ -355,7 +361,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public V getAndRemove(@Nullable Transaction tx, K key) {
- return sync(getAndRemoveAsync(tx, key));
+ return sync(() -> getAndRemoveAsync(tx, key));
}
/** {@inheritDoc} */
@@ -363,7 +369,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<V> getAndRemoveAsync(@Nullable Transaction tx, K
key) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshal(key, schemaVersion);
return tbl.getAndDelete(keyRow, (InternalTransaction) tx)
@@ -374,7 +380,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public NullableValue<V> getNullableAndRemove(@Nullable Transaction tx, K
key) {
- return sync(getNullableAndRemoveAsync(tx, key));
+ return sync(() -> getNullableAndRemoveAsync(tx, key));
}
/** {@inheritDoc} */
@@ -382,7 +388,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<NullableValue<V>>
getNullableAndRemoveAsync(@Nullable Transaction tx, K key) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshal(key, schemaVersion);
return tbl.getAndDelete(keyRow, (InternalTransaction) tx)
@@ -393,13 +399,13 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public boolean replace(@Nullable Transaction tx, K key, @Nullable V val) {
- return sync(replaceAsync(tx, key, val));
+ return sync(() -> replaceAsync(tx, key, val));
}
/** {@inheritDoc} */
@Override
public boolean replace(@Nullable Transaction tx, K key, @Nullable V
oldVal, @Nullable V newVal) {
- return sync(replaceAsync(tx, key, oldVal, newVal));
+ return sync(() -> replaceAsync(tx, key, oldVal, newVal));
}
/** {@inheritDoc} */
@@ -407,7 +413,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, K
key, @Nullable V val) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshal(key, val, schemaVersion);
return tbl.replace(row, (InternalTransaction) tx);
@@ -419,7 +425,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, K
key, @Nullable V oldVal, @Nullable V newVal) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx oldRow = marshal(key, oldVal, schemaVersion);
BinaryRowEx newRow = marshal(key, newVal, schemaVersion);
@@ -430,7 +436,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public V getAndReplace(@Nullable Transaction tx, K key, @Nullable V val) {
- return sync(getAndReplaceAsync(tx, key, val));
+ return sync(() -> getAndReplaceAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -439,7 +445,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
Objects.requireNonNull(key);
Objects.requireNonNull(val);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.getAndReplace(marshal(key, val, schemaVersion),
(InternalTransaction) tx)
.thenApply(binaryRow -> unmarshalValue(binaryRow,
schemaVersion));
});
@@ -448,7 +454,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
/** {@inheritDoc} */
@Override
public NullableValue<V> getNullableAndReplace(@Nullable Transaction tx, K
key, @Nullable V val) {
- return sync(getNullableAndReplaceAsync(tx, key, val));
+ return sync(() -> getNullableAndReplaceAsync(tx, key, val));
}
/** {@inheritDoc} */
@@ -456,7 +462,7 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
public CompletableFuture<NullableValue<V>>
getNullableAndReplaceAsync(@Nullable Transaction tx, K key, @Nullable V val) {
Objects.requireNonNull(key);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshal(key, val, schemaVersion);
return tbl.getAndReplace(row, (InternalTransaction) tx)
@@ -700,9 +706,11 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView<Entry<K, V>> imple
StreamerBatchSender<Entry<K, V>, Integer> batchSender = (partitionId,
items, deleted) ->
withSchemaSync(
null,
- schemaVersion ->
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted,
partitionId));
+ schemaVersion ->
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted,
partitionId)
+ );
- return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
+ CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner);
+ return convertToPublicFuture(preventThreadHijack(future));
}
/** {@inheritDoc} */
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index d62cce9023..84fa6db137 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
@@ -24,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.marshaller.MarshallersProvider;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -57,17 +60,20 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
* @param tbl The table.
* @param schemaRegistry Table schema registry.
* @param schemaVersions Schema versions access.
- * @param marshallers Marshallers provider.
* @param sql Ignite SQL facade.
+ * @param marshallers Marshallers provider.
+ * @param asyncContinuationExecutor Executor to which execution will be
resubmitted when leaving asynchronous public API
+ * endpoints (so as to prevent the user from stealing Ignite
threads).
*/
public RecordBinaryViewImpl(
InternalTable tbl,
SchemaRegistry schemaRegistry,
SchemaVersions schemaVersions,
+ IgniteSql sql,
MarshallersProvider marshallers,
- IgniteSql sql
+ Executor asyncContinuationExecutor
) {
- super(tbl, schemaVersions, schemaRegistry, sql, marshallers);
+ super(tbl, schemaVersions, schemaRegistry, sql, marshallers,
asyncContinuationExecutor);
marshallerCache = new TupleMarshallerCache(schemaRegistry);
}
@@ -75,7 +81,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public Tuple get(@Nullable Transaction tx, Tuple keyRec) {
- return sync(getAsync(tx, keyRec));
+ return sync(() -> getAsync(tx, keyRec));
}
/** {@inheritDoc} */
@@ -83,7 +89,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Tuple> getAsync(@Nullable Transaction tx, Tuple
keyRec) {
Objects.requireNonNull(keyRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row keyRow = marshal(keyRec, schemaVersion, true); // Convert to
portable format to pass TX/storage layer.
return tbl.get(keyRow, (InternalTransaction) tx).thenApply(row ->
wrap(row, schemaVersion));
@@ -101,14 +107,14 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
@Override
public List<Tuple> getAll(@Nullable Transaction tx, Collection<Tuple>
keyRecs) {
- return sync(getAllAsync(tx, keyRecs));
+ return sync(() -> getAllAsync(tx, keyRecs));
}
@Override
public CompletableFuture<List<Tuple>> getAllAsync(@Nullable Transaction
tx, Collection<Tuple> keyRecs) {
Objects.requireNonNull(keyRecs);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.getAll(mapToBinary(keyRecs, schemaVersion, true),
(InternalTransaction) tx)
.thenApply(binaryRows -> wrap(binaryRows, schemaVersion,
true));
});
@@ -117,7 +123,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public boolean contains(@Nullable Transaction tx, Tuple keyRec) {
- return sync(containsAsync(tx, keyRec));
+ return sync(() -> containsAsync(tx, keyRec));
}
/** {@inheritDoc} */
@@ -125,7 +131,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx,
Tuple keyRec) {
Objects.requireNonNull(keyRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row keyRow = marshal(keyRec, schemaVersion, true); // Convert to
portable format to pass TX/storage layer.
return tbl.get(keyRow, (InternalTransaction)
tx).thenApply(Objects::nonNull);
@@ -135,7 +141,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public void upsert(@Nullable Transaction tx, Tuple rec) {
- sync(upsertAsync(tx, rec));
+ sync(() -> upsertAsync(tx, rec));
}
/** {@inheritDoc} */
@@ -143,7 +149,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Void> upsertAsync(@Nullable Transaction tx, Tuple
rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(rec, schemaVersion, false);
return tbl.upsert(row, (InternalTransaction) tx);
@@ -153,7 +159,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public void upsertAll(@Nullable Transaction tx, Collection<Tuple> recs) {
- sync(upsertAllAsync(tx, recs));
+ sync(() -> upsertAllAsync(tx, recs));
}
/** {@inheritDoc} */
@@ -161,7 +167,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx,
Collection<Tuple> recs) {
Objects.requireNonNull(recs);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.upsertAll(mapToBinary(recs, schemaVersion, false),
(InternalTransaction) tx);
});
}
@@ -169,7 +175,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public Tuple getAndUpsert(@Nullable Transaction tx, Tuple rec) {
- return sync(getAndUpsertAsync(tx, rec));
+ return sync(() -> getAndUpsertAsync(tx, rec));
}
/** {@inheritDoc} */
@@ -177,7 +183,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Tuple> getAndUpsertAsync(@Nullable Transaction
tx, Tuple rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(rec, schemaVersion, false);
return tbl.getAndUpsert(row, (InternalTransaction)
tx).thenApply(resultRow -> wrap(resultRow, schemaVersion));
@@ -187,7 +193,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public boolean insert(@Nullable Transaction tx, Tuple rec) {
- return sync(insertAsync(tx, rec));
+ return sync(() -> insertAsync(tx, rec));
}
/** {@inheritDoc} */
@@ -195,7 +201,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Boolean> insertAsync(@Nullable Transaction tx,
Tuple rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(rec, schemaVersion, false);
return tbl.insert(row, (InternalTransaction) tx);
@@ -205,7 +211,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public List<Tuple> insertAll(@Nullable Transaction tx, Collection<Tuple>
recs) {
- return sync(insertAllAsync(tx, recs));
+ return sync(() -> insertAllAsync(tx, recs));
}
/** {@inheritDoc} */
@@ -213,7 +219,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<List<Tuple>> insertAllAsync(@Nullable Transaction
tx, Collection<Tuple> recs) {
Objects.requireNonNull(recs);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.insertAll(mapToBinary(recs, schemaVersion, false),
(InternalTransaction) tx)
.thenApply(rows -> wrap(rows, schemaVersion, false));
});
@@ -222,13 +228,13 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public boolean replace(@Nullable Transaction tx, Tuple rec) {
- return sync(replaceAsync(tx, rec));
+ return sync(() -> replaceAsync(tx, rec));
}
/** {@inheritDoc} */
@Override
public boolean replace(@Nullable Transaction tx, Tuple oldRec, Tuple
newRec) {
- return sync(replaceAsync(tx, oldRec, newRec));
+ return sync(() -> replaceAsync(tx, oldRec, newRec));
}
/** {@inheritDoc} */
@@ -236,7 +242,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx,
Tuple rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(rec, schemaVersion, false);
return tbl.replace(row, (InternalTransaction) tx);
@@ -249,7 +255,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
Objects.requireNonNull(oldRec);
Objects.requireNonNull(newRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row oldRow = marshal(oldRec, schemaVersion, false);
Row newRow = marshal(newRec, schemaVersion, false);
@@ -260,7 +266,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public Tuple getAndReplace(@Nullable Transaction tx, Tuple rec) {
- return sync(getAndReplaceAsync(tx, rec));
+ return sync(() -> getAndReplaceAsync(tx, rec));
}
/** {@inheritDoc} */
@@ -268,7 +274,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Tuple> getAndReplaceAsync(@Nullable Transaction
tx, Tuple rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(rec, schemaVersion, false);
return tbl.getAndReplace(row, (InternalTransaction)
tx).thenApply(resultRow -> wrap(resultRow, schemaVersion));
@@ -278,7 +284,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public boolean delete(@Nullable Transaction tx, Tuple keyRec) {
- return sync(deleteAsync(tx, keyRec));
+ return sync(() -> deleteAsync(tx, keyRec));
}
/** {@inheritDoc} */
@@ -286,7 +292,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Boolean> deleteAsync(@Nullable Transaction tx,
Tuple keyRec) {
Objects.requireNonNull(keyRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row keyRow = marshal(keyRec, schemaVersion, true);
return tbl.delete(keyRow, (InternalTransaction) tx);
@@ -296,7 +302,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public boolean deleteExact(@Nullable Transaction tx, Tuple rec) {
- return sync(deleteExactAsync(tx, rec));
+ return sync(() -> deleteExactAsync(tx, rec));
}
/** {@inheritDoc} */
@@ -304,7 +310,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Boolean> deleteExactAsync(@Nullable Transaction
tx, Tuple rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row row = marshal(rec, schemaVersion, false);
return tbl.deleteExact(row, (InternalTransaction) tx);
@@ -314,7 +320,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public Tuple getAndDelete(@Nullable Transaction tx, Tuple keyRec) {
- return sync(getAndDeleteAsync(tx, keyRec));
+ return sync(() -> getAndDeleteAsync(tx, keyRec));
}
/** {@inheritDoc} */
@@ -322,7 +328,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<Tuple> getAndDeleteAsync(@Nullable Transaction
tx, Tuple keyRec) {
Objects.requireNonNull(keyRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Row keyRow = marshal(keyRec, schemaVersion, true);
return tbl.getAndDelete(keyRow, (InternalTransaction)
tx).thenApply(row -> wrap(row, schemaVersion));
@@ -332,7 +338,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public List<Tuple> deleteAll(@Nullable Transaction tx, Collection<Tuple>
keyRecs) {
- return sync(deleteAllAsync(tx, keyRecs));
+ return sync(() -> deleteAllAsync(tx, keyRecs));
}
/** {@inheritDoc} */
@@ -340,7 +346,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<List<Tuple>> deleteAllAsync(@Nullable Transaction
tx, Collection<Tuple> keyRecs) {
Objects.requireNonNull(keyRecs);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.deleteAll(mapToBinary(keyRecs, schemaVersion, true),
(InternalTransaction) tx)
.thenApply(rows -> wrapKeys(rows, schemaVersion));
});
@@ -349,7 +355,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
/** {@inheritDoc} */
@Override
public List<Tuple> deleteAllExact(@Nullable Transaction tx,
Collection<Tuple> recs) {
- return sync(deleteAllExactAsync(tx, recs));
+ return sync(() -> deleteAllExactAsync(tx, recs));
}
/** {@inheritDoc} */
@@ -357,7 +363,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
public CompletableFuture<List<Tuple>> deleteAllExactAsync(@Nullable
Transaction tx, Collection<Tuple> recs) {
Objects.requireNonNull(recs);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.deleteAllExact(mapToBinary(recs, schemaVersion, false),
(InternalTransaction) tx)
.thenApply(rows -> wrap(rows, schemaVersion, false));
});
@@ -484,9 +490,13 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
Objects.requireNonNull(publisher);
var partitioner = new
TupleStreamerPartitionAwarenessProvider(rowConverter.registry(),
tbl.partitions());
- StreamerBatchSender<Tuple, Integer> batchSender = this::updateAll;
+ StreamerBatchSender<Tuple, Integer> batchSender = (partitionId, rows,
deleted) ->
+ withSchemaSync(null,
+ schemaVersion -> this.tbl.updateAll(mapToBinary(rows,
schemaVersion, deleted), deleted, partitionId)
+ );
- return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
+ CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner);
+ return convertToPublicFuture(preventThreadHijack(future));
}
/**
@@ -498,7 +508,7 @@ public class RecordBinaryViewImpl extends
AbstractTableView<Tuple> implements Re
* @return Future that will be completed when the stream is finished.
*/
public CompletableFuture<Void> updateAll(int partitionId,
Collection<Tuple> rows, @Nullable BitSet deleted) {
- return withSchemaSync(null,
+ return doOperation(null,
schemaVersion -> this.tbl.updateAll(mapToBinary(rows,
schemaVersion, deleted), deleted, partitionId));
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index cf7ec8df18..279c09e827 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.table;
+import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
@@ -24,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import org.apache.ignite.internal.marshaller.Marshaller;
@@ -73,19 +76,22 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
* @param tbl Table.
* @param schemaRegistry Schema registry.
* @param schemaVersions Schema versions access.
+ * @param sql Ignite SQL facade.
* @param marshallers Marshallers provider.
+ * @param asyncContinuationExecutor Executor to which execution will be
resubmitted when leaving asynchronous public API
+ * endpoints (so as to prevent the user from stealing Ignite
threads).
* @param mapper Record class mapper.
- * @param sql Ignite SQL facade.
*/
public RecordViewImpl(
InternalTable tbl,
SchemaRegistry schemaRegistry,
SchemaVersions schemaVersions,
+ IgniteSql sql,
MarshallersProvider marshallers,
- Mapper<R> mapper,
- IgniteSql sql
+ Executor asyncContinuationExecutor,
+ Mapper<R> mapper
) {
- super(tbl, schemaVersions, schemaRegistry, sql, marshallers);
+ super(tbl, schemaVersions, schemaRegistry, sql, marshallers,
asyncContinuationExecutor);
this.mapper = mapper;
marshallerFactory = (schema) -> new RecordMarshallerImpl<>(schema,
marshallers, mapper);
@@ -94,7 +100,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public R get(@Nullable Transaction tx, R keyRec) {
- return sync(getAsync(tx, keyRec));
+ return sync(() -> getAsync(tx, keyRec));
}
/** {@inheritDoc} */
@@ -102,7 +108,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<R> getAsync(@Nullable Transaction tx, R keyRec) {
Objects.requireNonNull(keyRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshalKey(keyRec, schemaVersion);
return tbl.get(keyRow, (InternalTransaction)
tx).thenApply(binaryRow -> unmarshal(binaryRow, schemaVersion));
@@ -111,14 +117,14 @@ public class RecordViewImpl<R> extends
AbstractTableView<R> implements RecordVie
@Override
public List<R> getAll(@Nullable Transaction tx, Collection<R> keyRecs) {
- return sync(getAllAsync(tx, keyRecs));
+ return sync(() -> getAllAsync(tx, keyRecs));
}
@Override
public CompletableFuture<List<R>> getAllAsync(@Nullable Transaction tx,
Collection<R> keyRecs) {
Objects.requireNonNull(keyRecs);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.getAll(marshalKeys(keyRecs, schemaVersion),
(InternalTransaction) tx)
.thenApply(binaryRows -> unmarshal(binaryRows,
schemaVersion, true));
});
@@ -127,7 +133,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public boolean contains(@Nullable Transaction tx, R keyRec) {
- return sync(containsAsync(tx, keyRec));
+ return sync(() -> containsAsync(tx, keyRec));
}
/** {@inheritDoc} */
@@ -135,7 +141,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx,
R keyRec) {
Objects.requireNonNull(keyRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshalKey(keyRec, schemaVersion);
return tbl.get(keyRow, (InternalTransaction)
tx).thenApply(Objects::nonNull);
@@ -145,7 +151,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public void upsert(@Nullable Transaction tx, R rec) {
- sync(upsertAsync(tx, rec));
+ sync(() -> upsertAsync(tx, rec));
}
/** {@inheritDoc} */
@@ -153,7 +159,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<Void> upsertAsync(@Nullable Transaction tx, R
rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshal(rec, schemaVersion);
return tbl.upsert(keyRow, (InternalTransaction) tx);
@@ -163,7 +169,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public void upsertAll(@Nullable Transaction tx, Collection<R> recs) {
- sync(upsertAllAsync(tx, recs));
+ sync(() -> upsertAllAsync(tx, recs));
}
/** {@inheritDoc} */
@@ -171,7 +177,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx,
Collection<R> recs) {
Objects.requireNonNull(recs);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
return tbl.upsertAll(marshal(recs, schemaVersion),
(InternalTransaction) tx);
});
}
@@ -179,7 +185,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public R getAndUpsert(@Nullable Transaction tx, R rec) {
- return sync(getAndUpsertAsync(tx, rec));
+ return sync(() -> getAndUpsertAsync(tx, rec));
}
/** {@inheritDoc} */
@@ -187,7 +193,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<R> getAndUpsertAsync(@Nullable Transaction tx, R
rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshal(rec, schemaVersion);
return tbl.getAndUpsert(keyRow, (InternalTransaction)
tx).thenApply(binaryRow -> unmarshal(binaryRow, schemaVersion));
@@ -197,7 +203,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public boolean insert(@Nullable Transaction tx, R rec) {
- return sync(insertAsync(tx, rec));
+ return sync(() -> insertAsync(tx, rec));
}
/** {@inheritDoc} */
@@ -205,7 +211,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<Boolean> insertAsync(@Nullable Transaction tx, R
rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx keyRow = marshal(rec, schemaVersion);
return tbl.insert(keyRow, (InternalTransaction) tx);
@@ -215,7 +221,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public List<R> insertAll(@Nullable Transaction tx, Collection<R> recs) {
- return sync(insertAllAsync(tx, recs));
+ return sync(() -> insertAllAsync(tx, recs));
}
/** {@inheritDoc} */
@@ -223,7 +229,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<List<R>> insertAllAsync(@Nullable Transaction tx,
Collection<R> recs) {
Objects.requireNonNull(recs);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Collection<BinaryRowEx> rows = marshal(recs, schemaVersion);
return tbl.insertAll(rows, (InternalTransaction)
tx).thenApply(binaryRows -> unmarshal(binaryRows, schemaVersion, false));
@@ -233,13 +239,13 @@ public class RecordViewImpl<R> extends
AbstractTableView<R> implements RecordVie
/** {@inheritDoc} */
@Override
public boolean replace(@Nullable Transaction tx, R rec) {
- return sync(replaceAsync(tx, rec));
+ return sync(() -> replaceAsync(tx, rec));
}
/** {@inheritDoc} */
@Override
public boolean replace(@Nullable Transaction tx, R oldRec, R newRec) {
- return sync(replaceAsync(tx, oldRec, newRec));
+ return sync(() -> replaceAsync(tx, oldRec, newRec));
}
/** {@inheritDoc} */
@@ -247,7 +253,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, R
rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx newRow = marshal(rec, schemaVersion);
return tbl.replace(newRow, (InternalTransaction) tx);
@@ -260,7 +266,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
Objects.requireNonNull(oldRec);
Objects.requireNonNull(newRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx oldRow = marshal(oldRec, schemaVersion);
BinaryRowEx newRow = marshal(newRec, schemaVersion);
@@ -271,7 +277,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public R getAndReplace(@Nullable Transaction tx, R rec) {
- return sync(getAndReplaceAsync(tx, rec));
+ return sync(() -> getAndReplaceAsync(tx, rec));
}
/** {@inheritDoc} */
@@ -279,7 +285,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<R> getAndReplaceAsync(@Nullable Transaction tx, R
rec) {
Objects.requireNonNull(rec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshal(rec, schemaVersion);
return tbl.getAndReplace(row, (InternalTransaction)
tx).thenApply(binaryRow -> unmarshal(binaryRow, schemaVersion));
@@ -289,7 +295,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public boolean delete(@Nullable Transaction tx, R keyRec) {
- return sync(deleteAsync(tx, keyRec));
+ return sync(() -> deleteAsync(tx, keyRec));
}
/** {@inheritDoc} */
@@ -297,7 +303,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<Boolean> deleteAsync(@Nullable Transaction tx, R
keyRec) {
Objects.requireNonNull(keyRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshalKey(keyRec, schemaVersion);
return tbl.delete(row, (InternalTransaction) tx);
@@ -307,7 +313,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public boolean deleteExact(@Nullable Transaction tx, R rec) {
- return sync(deleteExactAsync(tx, rec));
+ return sync(() -> deleteExactAsync(tx, rec));
}
/** {@inheritDoc} */
@@ -315,7 +321,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<Boolean> deleteExactAsync(@Nullable Transaction
tx, R keyRec) {
Objects.requireNonNull(keyRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshal(keyRec, schemaVersion);
return tbl.deleteExact(row, (InternalTransaction) tx);
@@ -325,7 +331,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public R getAndDelete(@Nullable Transaction tx, R keyRec) {
- return sync(getAndDeleteAsync(tx, keyRec));
+ return sync(() -> getAndDeleteAsync(tx, keyRec));
}
/** {@inheritDoc} */
@@ -333,7 +339,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<R> getAndDeleteAsync(@Nullable Transaction tx, R
keyRec) {
Objects.requireNonNull(keyRec);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
BinaryRowEx row = marshalKey(keyRec, schemaVersion);
return tbl.getAndDelete(row, (InternalTransaction)
tx).thenApply(binaryRow -> unmarshal(binaryRow, schemaVersion));
@@ -343,7 +349,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public List<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs) {
- return sync(deleteAllAsync(tx, keyRecs));
+ return sync(() -> deleteAllAsync(tx, keyRecs));
}
/** {@inheritDoc} */
@@ -351,7 +357,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<List<R>> deleteAllAsync(@Nullable Transaction tx,
Collection<R> keyRecs) {
Objects.requireNonNull(keyRecs);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Collection<BinaryRowEx> rows = marshal(keyRecs, schemaVersion);
return tbl.deleteAll(rows, (InternalTransaction)
tx).thenApply(binaryRows -> unmarshal(binaryRows, schemaVersion, false));
@@ -361,7 +367,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
/** {@inheritDoc} */
@Override
public List<R> deleteAllExact(@Nullable Transaction tx, Collection<R>
recs) {
- return sync(deleteAllExactAsync(tx, recs));
+ return sync(() -> deleteAllExactAsync(tx, recs));
}
/** {@inheritDoc} */
@@ -369,7 +375,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R>
implements RecordVie
public CompletableFuture<List<R>> deleteAllExactAsync(@Nullable
Transaction tx, Collection<R> keyRecs) {
Objects.requireNonNull(keyRecs);
- return withSchemaSync(tx, (schemaVersion) -> {
+ return doOperation(tx, (schemaVersion) -> {
Collection<BinaryRowEx> rows = marshal(keyRecs, schemaVersion);
return tbl.deleteAllExact(rows, (InternalTransaction) tx)
@@ -572,9 +578,11 @@ public class RecordViewImpl<R> extends
AbstractTableView<R> implements RecordVie
StreamerBatchSender<R, Integer> batchSender = (partitionId, items,
deleted) ->
withSchemaSync(
null,
- schemaVersion -> this.tbl.updateAll(marshal(items,
schemaVersion, deleted), deleted, partitionId));
+ schemaVersion -> this.tbl.updateAll(marshal(items,
schemaVersion, deleted), deleted, partitionId)
+ );
- return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
+ CompletableFuture<Void> future = DataStreamer.streamData(publisher,
options, batchSender, partitioner);
+ return convertToPublicFuture(preventThreadHijack(future));
}
/** {@inheritDoc} */
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index e29c7667aa..11d805f409 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.marshaller.MarshallerException;
@@ -76,6 +78,8 @@ public class TableImpl implements TableViewInternal {
private final int pkId;
+ private final Executor asyncContinuationExecutor;
+
/**
* Constructor.
*
@@ -85,6 +89,8 @@ public class TableImpl implements TableViewInternal {
* @param marshallers Marshallers provider.
* @param sql Ignite SQL facade.
* @param pkId ID of a primary index.
+ * @param asyncContinuationExecutor Executor to which execution will be
resubmitted when leaving asynchronous public API endpoints
+ * (so as to prevent the user from stealing Ignite threads).
*/
public TableImpl(
InternalTable tbl,
@@ -92,7 +98,8 @@ public class TableImpl implements TableViewInternal {
SchemaVersions schemaVersions,
MarshallersProvider marshallers,
IgniteSql sql,
- int pkId
+ int pkId,
+ Executor asyncContinuationExecutor
) {
this.tbl = tbl;
this.lockManager = lockManager;
@@ -100,6 +107,7 @@ public class TableImpl implements TableViewInternal {
this.marshallers = marshallers;
this.sql = sql;
this.pkId = pkId;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
}
/**
@@ -121,7 +129,7 @@ public class TableImpl implements TableViewInternal {
IgniteSql sql,
int pkId
) {
- this(tbl, lockManager, schemaVersions, new
ReflectionMarshallersProvider(), sql, pkId);
+ this(tbl, lockManager, schemaVersions, new
ReflectionMarshallersProvider(), sql, pkId, ForkJoinPool.commonPool());
this.schemaReg = schemaReg;
}
@@ -166,22 +174,22 @@ public class TableImpl implements TableViewInternal {
@Override
public <R> RecordView<R> recordView(Mapper<R> recMapper) {
- return new RecordViewImpl<>(tbl, schemaReg, schemaVersions,
marshallers, recMapper, sql);
+ return new RecordViewImpl<>(tbl, schemaReg, schemaVersions, sql,
marshallers, asyncContinuationExecutor, recMapper);
}
@Override
public RecordView<Tuple> recordView() {
- return new RecordBinaryViewImpl(tbl, schemaReg, schemaVersions,
marshallers, sql);
+ return new RecordBinaryViewImpl(tbl, schemaReg, schemaVersions, sql,
marshallers, asyncContinuationExecutor);
}
@Override
public <K, V> KeyValueView<K, V> keyValueView(Mapper<K> keyMapper,
Mapper<V> valMapper) {
- return new KeyValueViewImpl<>(tbl, schemaReg, schemaVersions,
marshallers, sql, keyMapper, valMapper);
+ return new KeyValueViewImpl<>(tbl, schemaReg, schemaVersions, sql,
marshallers, asyncContinuationExecutor, keyMapper, valMapper);
}
@Override
public KeyValueView<Tuple, Tuple> keyValueView() {
- return new KeyValueBinaryViewImpl(tbl, schemaReg, schemaVersions,
marshallers, sql);
+ return new KeyValueBinaryViewImpl(tbl, schemaReg, schemaVersions, sql,
marshallers, asyncContinuationExecutor);
}
@Override
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 561ff10ddf..89e5676705 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -391,6 +391,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry;
+ private final Executor asyncContinuationExecutor;
+
/**
* Creates a new table manager.
*
@@ -416,6 +418,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
* @param sql A supplier function that returns {@link IgniteSql}.
* @param rebalanceScheduler Executor for scheduling rebalance routine.
* @param lowWatermark Low watermark.
+ * @param asyncContinuationExecutor Executor to which execution will be
resubmitted when leaving asynchronous public API endpoints
+ * (so as to prevent the user from stealing Ignite threads).
*/
public TableManager(
String nodeName,
@@ -448,7 +452,8 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
Supplier<IgniteSql> sql,
RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry,
ScheduledExecutorService rebalanceScheduler,
- LowWatermark lowWatermark
+ LowWatermark lowWatermark,
+ Executor asyncContinuationExecutor
) {
this.topologyService = topologyService;
this.raftMgr = raftMgr;
@@ -473,6 +478,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.remotelyTriggeredResourceRegistry =
remotelyTriggeredResourceRegistry;
this.rebalanceScheduler = rebalanceScheduler;
this.lowWatermark = lowWatermark;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
this.executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -1267,7 +1273,15 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
tableRaftService
);
- var table = new TableImpl(internalTable, lockMgr, schemaVersions,
marshallers, sql.get(), tableDescriptor.primaryKeyIndexId());
+ var table = new TableImpl(
+ internalTable,
+ lockMgr,
+ schemaVersions,
+ marshallers,
+ sql.get(),
+ tableDescriptor.primaryKeyIndexId(),
+ asyncContinuationExecutor
+ );
tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, ()
-> {
if (e != null) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
index 53faae7d06..19636d9db6 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.verify;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
@@ -458,7 +459,9 @@ public class KeyValueBinaryViewOperationsTest extends
TableKvOperationsTestBase
internalTable,
new DummySchemaManagerImpl(schema),
schemaVersions,
- marshallers, mock(IgniteSql.class)
+ mock(IgniteSql.class),
+ marshallers,
+ ForkJoinPool.commonPool()
);
BinaryRow resultRow = new
TupleMarshallerImpl(schema).marshal(Tuple.create().set("ID", 1L).set("VAL",
2L));
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
index b998131b97..9cb632d123 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
@@ -57,6 +57,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import org.apache.ignite.internal.marshaller.MarshallersProvider;
import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
@@ -726,8 +727,9 @@ public class KeyValueViewOperationsTest extends
TableKvOperationsTestBase {
internalTable,
new DummySchemaManagerImpl(schema),
schemaVersions,
- marshallers,
mock(IgniteSql.class),
+ marshallers,
+ ForkJoinPool.commonPool(),
keyMapper,
valMapper
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
index cb8fe29c2c..f5904e412f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
@@ -40,6 +40,7 @@ import static org.mockito.Mockito.verify;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.ForkJoinPool;
import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
@@ -611,7 +612,12 @@ public class RecordBinaryViewOperationsTest extends
TableKvOperationsTestBase {
ReflectionMarshallersProvider marshallers = new
ReflectionMarshallersProvider();
RecordView<Tuple> view = new RecordBinaryViewImpl(
- internalTable, new DummySchemaManagerImpl(schema),
schemaVersions, marshallers, mock(IgniteSql.class)
+ internalTable,
+ new DummySchemaManagerImpl(schema),
+ schemaVersions,
+ mock(IgniteSql.class),
+ marshallers,
+ ForkJoinPool.commonPool()
);
BinaryRow resultRow = new
TupleMarshallerImpl(schema).marshal(Tuple.create().set("id", 1L).set("val",
2L));
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
index 64a80bb584..f3918d3a96 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
@@ -55,6 +55,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
import
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithAllTypes;
@@ -409,9 +410,10 @@ public class RecordViewOperationsTest extends
TableKvOperationsTestBase {
internalTable,
new DummySchemaManagerImpl(schema),
schemaVersions,
+ mock(IgniteSql.class),
marshallers,
- recMapper,
- mock(IgniteSql.class)
+ ForkJoinPool.commonPool(),
+ recMapper
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7b041ea1a6..2fa2603f43 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -65,6 +65,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
@@ -777,7 +778,8 @@ public class TableManagerTest extends IgniteAbstractTest {
() -> mock(IgniteSql.class),
new RemotelyTriggeredResourceRegistry(),
mock(ScheduledExecutorService.class),
- new LowWatermark(NODE_NAME, gcConfig.lowWatermark(), clock,
tm, vaultManager, mock(FailureProcessor.class))
+ new LowWatermark(NODE_NAME, gcConfig.lowWatermark(), clock,
tm, vaultManager, mock(FailureProcessor.class)),
+ ForkJoinPool.commonPool()
) {
@Override