This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 655cf0fbb5 IGNITE-21869 Prevent thread hijacking via IgniteSql (#3512)
655cf0fbb5 is described below
commit 655cf0fbb53a8f160bcd2891ae24b68335d57532
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Apr 1 17:03:10 2024 +0400
IGNITE-21869 Prevent thread hijacking via IgniteSql (#3512)
---
.../org/apache/ignite/internal/app/IgniteImpl.java | 11 +-
.../sql/threading/ItSqlApiThreadingTest.java | 237 +++++++++++++++++++++
.../internal/sql/api/AntiHijackAsyncResultSet.java | 74 +++++++
.../internal/sql/api/AntiHijackIgniteSql.java | 190 +++++++++++++++++
.../ignite/internal/sql/api/IgniteSqlImpl.java | 1 +
5 files changed, 510 insertions(+), 3 deletions(-)
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 91adcae0cb..b4ca5bf0b2 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -173,6 +173,7 @@ import
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguratio
import
org.apache.ignite.internal.security.authentication.AuthenticationManager;
import
org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
import org.apache.ignite.internal.security.configuration.SecurityConfiguration;
+import org.apache.ignite.internal.sql.api.AntiHijackIgniteSql;
import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
import
org.apache.ignite.internal.sql.configuration.distributed.SqlDistributedConfiguration;
import
org.apache.ignite.internal.sql.configuration.local.SqlLocalConfiguration;
@@ -758,7 +759,7 @@ public class IgniteImpl implements Ignite {
catalogManager,
observableTimestampTracker,
placementDriverMgr.placementDriver(),
- this::sql,
+ this::bareSql,
resourcesRegistry,
rebalanceScheduler,
lowWatermark,
@@ -1213,10 +1214,14 @@ public class IgniteImpl implements Ignite {
return new AntiHijackIgniteTransactions(transactions,
asyncContinuationExecutor);
}
+ private IgniteSql bareSql() {
+ return sql;
+ }
+
/** {@inheritDoc} */
@Override
public IgniteSql sql() {
- return sql;
+ return new AntiHijackIgniteSql(sql, asyncContinuationExecutor);
}
/** {@inheritDoc} */
@@ -1251,7 +1256,7 @@ public class IgniteImpl implements Ignite {
@Override
public IgniteCatalog catalog(Options options) {
- return new IgniteCatalogSqlImpl(sql(), options);
+ return new IgniteCatalogSqlImpl(sql, options);
}
/**
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
new file mode 100644
index 0000000000..ff6f5b841b
--- /dev/null
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.threading;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static
org.apache.ignite.internal.PublicApiThreadingTests.anIgniteThread;
+import static
org.apache.ignite.internal.PublicApiThreadingTests.asyncContinuationPool;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.either;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.PublicApiThreadingTests;
+import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+@SuppressWarnings("resource")
+class ItSqlApiThreadingTest extends ClusterPerClassIntegrationTest {
+ private static final String TABLE_NAME = "test";
+
+ private static final String SELECT_QUERY = "SELECT * FROM " + TABLE_NAME;
+ private static final String UPDATE_QUERY = "UPDATE " + TABLE_NAME + " SET
val = val WHERE id = ?";
+
+ private static final int MORE_THAN_DEFAULT_STATEMENT_PAGE_SIZE = 2048;
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @BeforeAll
+ void createTable() {
+ sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val
VARCHAR)");
+
+ // Putting more than the doubled default query page size rows to make
sure that CriteriaQuerySource#query() returns a non-closed
+ // cursor even after we call its second page.
+ // TODO: Instead, configure pageSize=1 on each #query() call when
https://issues.apache.org/jira/browse/IGNITE-18647 is fixed.
+ Map<Integer, String> valuesForQuerying = IntStream.range(1, 1 + 2 *
MORE_THAN_DEFAULT_STATEMENT_PAGE_SIZE)
+ .boxed()
+ .collect(toMap(identity(), Object::toString));
+ plainKeyValueView().putAll(null, valuesForQuerying);
+ }
+
+ private static KeyValueView<Integer, String> plainKeyValueView() {
+ return testTable().keyValueView(Integer.class, String.class);
+ }
+
+ private static Table testTable() {
+ return CLUSTER.aliveNode().tables().table(TABLE_NAME);
+ }
+
+ private static IgniteSql igniteSqlForInternalUse() {
+ return Wrappers.unwrap(igniteSqlForPublicUse(), IgniteSqlImpl.class);
+ }
+
+ @ParameterizedTest
+ @EnumSource(SqlAsyncOperation.class)
+ void sqlFuturesCompleteInContinuationsPool(SqlAsyncOperation operation) {
+ CompletableFuture<Thread> completerFuture =
forcingSwitchFromUserThread(
+ () -> operation.executeOn(igniteSqlForPublicUse())
+ .thenApply(unused -> currentThread())
+ );
+
+ assertThat(completerFuture, willBe(asyncContinuationPool()));
+ }
+
+ private static IgniteSql igniteSqlForPublicUse() {
+ return CLUSTER.aliveNode().sql();
+ }
+
+ private static <T> T forcingSwitchFromUserThread(Supplier<? extends T>
action) {
+ return
PublicApiThreadingTests.forcingSwitchFromUserThread(CLUSTER.aliveNode(),
action);
+ }
+
+ @ParameterizedTest
+ @EnumSource(SqlAsyncOperation.class)
+ void
sqlFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(SqlAsyncOperation
operation) {
+ CompletableFuture<Thread> completerFuture =
forcingSwitchFromUserThread(
+ () -> operation.executeOn(igniteSqlForInternalUse())
+ .thenApply(unused -> currentThread())
+ );
+
+ assertThat(completerFuture, willBe(anIgniteThread()));
+ }
+
+ @ParameterizedTest
+ @EnumSource(AsyncResultSetAsyncOperation.class)
+ void
asyncResultSetFuturesCompleteInContinuationsPool(AsyncResultSetAsyncOperation
operation) throws Exception {
+ AsyncResultSet<?> firstPage = fetchFirstPage(igniteSqlForPublicUse());
+
+ CompletableFuture<Thread> completerFuture =
operation.executeOn(firstPage)
+ .thenApply(unused -> currentThread());
+
+ // The future might get completed in the calling thread as we don't
force a wait inside Ignite
+ // (because we cannot do this with fetching next page or closing).
+ assertThat(completerFuture, willBe(
+ either(is(currentThread())).or(asyncContinuationPool())
+ ));
+ }
+
+ private static AsyncResultSet<SqlRow> fetchFirstPage(IgniteSql igniteSql)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return igniteSql.executeAsync(null, SELECT_QUERY).get(10, SECONDS);
+ }
+
+ @ParameterizedTest
+ @EnumSource(AsyncResultSetAsyncOperation.class)
+ void
asyncResultSetFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(AsyncResultSetAsyncOperation
operation)
+ throws Exception {
+ AsyncResultSet<?> firstPage =
fetchFirstPage(igniteSqlForInternalUse());
+
+ CompletableFuture<Thread> completerFuture =
operation.executeOn(firstPage)
+ .thenApply(unused -> currentThread());
+
+ // The future might get completed in the calling thread as we don't
force a wait inside Ignite
+ // (because we cannot do this with fetching next page or closing).
+ assertThat(completerFuture, willBe(
+ either(is(currentThread())).or(anIgniteThread())
+ ));
+ }
+
+ /**
+ * This test differs from {@link
#asyncResultSetFuturesCompleteInContinuationsPool(AsyncResultSetAsyncOperation)}
in that it obtains
+ * the future to test from a call on a ResultSet obtained from a
ResultSet, not from a view.
+ */
+ @ParameterizedTest
+ @EnumSource(AsyncResultSetAsyncOperation.class)
+ void
asyncResultSetFuturesAfterFetchCompleteInContinuationsPool(AsyncResultSetAsyncOperation
operation) throws Exception {
+ AsyncResultSet<?> firstPage = fetchFirstPage(igniteSqlForPublicUse());
+ AsyncResultSet<?> secondPage = firstPage.fetchNextPage().get(10,
SECONDS);
+
+ CompletableFuture<Thread> completerFuture =
operation.executeOn(secondPage)
+ .thenApply(unused -> currentThread());
+
+ // The future might get completed in the calling thread as we don't
force a wait inside Ignite
+ // (because we cannot do this with fetching next page or closing).
+ assertThat(completerFuture, willBe(
+ either(is(currentThread())).or(asyncContinuationPool())
+ ));
+ }
+
+ /**
+ * This test differs from
+ * {@link
#asyncResultSetFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(AsyncResultSetAsyncOperation)}
in that
+ * it obtains the future to test from a call on a ResultSet obtained from
a ResultSet, not from a view.
+ */
+ @ParameterizedTest
+ @EnumSource(AsyncResultSetAsyncOperation.class)
+ void
asyncResultSetFuturesAfterFetchFromInternalCallsAreNotResubmittedToContinuationsPool(AsyncResultSetAsyncOperation
operation)
+ throws Exception {
+ AsyncResultSet<?> firstPage =
fetchFirstPage(igniteSqlForInternalUse());
+ AsyncResultSet<?> secondPage = firstPage.fetchNextPage().get(10,
SECONDS);
+
+ CompletableFuture<Thread> completerFuture =
operation.executeOn(secondPage)
+ .thenApply(unused -> currentThread());
+
+ // The future might get completed in the calling thread as we don't
force a wait inside Ignite
+ // (because we cannot do this with fetching next page or closing).
+ assertThat(completerFuture, willBe(
+ either(is(currentThread())).or(anIgniteThread())
+ ));
+ }
+
+ private enum SqlAsyncOperation {
+ EXECUTE_QUERY_ASYNC(sql -> sql.executeAsync(null, SELECT_QUERY)),
+ EXECUTE_STATEMENT_ASYNC(sql -> sql.executeAsync(null,
sql.createStatement(SELECT_QUERY))),
+ // TODO: IGNITE-18695 - uncomment 2 following lines.
+ // EXECUTE_QUERY_WITH_MAPPER_ASYNC(sql -> sql.executeAsync(null,
(Mapper<?>) null, SELECT_QUERY)),
+ // EXECUTE_STATEMENT_WITH_MAPPER_ASYNC(sql -> sql.executeAsync(null,
(Mapper<?>) null, sql.createStatement(SELECT_QUERY))),
+ EXECUTE_BATCH_QUERY_ASYNC(sql -> sql.executeBatchAsync(null,
UPDATE_QUERY, BatchedArguments.of(10_000))),
+ // TODO: IGNITE-21872 - uncomment the following lines.
+ // EXECUTE_BATCH_STATEMENT_ASYNC(
+ // sql -> sql.executeBatchAsync(null,
sql.createStatement(UPDATE_QUERY), BatchedArguments.of(10_000))
+ // ),
+ EXECUTE_SCRIPT_ASYNC(sql -> sql.executeScriptAsync(SELECT_QUERY));
+
+ private final Function<IgniteSql, CompletableFuture<?>> action;
+
+ SqlAsyncOperation(Function<IgniteSql, CompletableFuture<?>> action) {
+ this.action = action;
+ }
+
+ CompletableFuture<?> executeOn(IgniteSql sql) {
+ return action.apply(sql);
+ }
+ }
+
+ private enum AsyncResultSetAsyncOperation {
+ FETCH_NEXT_PAGE(resultSet -> resultSet.fetchNextPage()),
+ CLOSE(resultSet -> resultSet.closeAsync());
+
+ private final Function<AsyncResultSet<Object>, CompletableFuture<?>>
action;
+
+ AsyncResultSetAsyncOperation(Function<AsyncResultSet<Object>,
CompletableFuture<?>> action) {
+ this.action = action;
+ }
+
+ CompletableFuture<?> executeOn(AsyncResultSet<?> resultSet) {
+ return action.apply((AsyncResultSet<Object>) resultSet);
+ }
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackAsyncResultSet.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackAsyncResultSet.java
new file mode 100644
index 0000000000..4c65d2a8ef
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackAsyncResultSet.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static
org.apache.ignite.internal.thread.PublicApiThreading.preventThreadHijack;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.table.AntiHijackAsyncCursor;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around {@link AsyncResultSet} which prevents Ignite internal
threads from being hijacked by the user via asynchronous methods.
+ *
+ * @see PublicApiThreading
+ */
+public class AntiHijackAsyncResultSet<T> extends AntiHijackAsyncCursor<T>
implements AsyncResultSet<T> {
+ private final AsyncResultSet<T> resultSet;
+ private final Executor asyncContinuationExecutor;
+
+ /**
+ * Constructor.
+ */
+ public AntiHijackAsyncResultSet(AsyncResultSet<T> resultSet, Executor
asyncContinuationExecutor) {
+ super(resultSet, asyncContinuationExecutor);
+
+ this.resultSet = resultSet;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
+ }
+
+ @Override
+ public @Nullable ResultSetMetadata metadata() {
+ return resultSet.metadata();
+ }
+
+ @Override
+ public boolean hasRowSet() {
+ return resultSet.hasRowSet();
+ }
+
+ @Override
+ public long affectedRows() {
+ return resultSet.affectedRows();
+ }
+
+ @Override
+ public boolean wasApplied() {
+ return resultSet.wasApplied();
+ }
+
+ @Override
+ public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
+ return preventThreadHijack(resultSet.fetchNextPage(),
asyncContinuationExecutor)
+ .thenApply(resultSet -> new
AntiHijackAsyncResultSet<>(resultSet, asyncContinuationExecutor));
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackIgniteSql.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackIgniteSql.java
new file mode 100644
index 0000000000..24e29c55d4
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AntiHijackIgniteSql.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Wrapper around {@link IgniteSql} that adds protection agains thread
hijacking by users.
+ */
+public class AntiHijackIgniteSql implements IgniteSql, Wrapper {
+ private final IgniteSql sql;
+ private final Executor asyncContinuationExecutor;
+
+ public AntiHijackIgniteSql(IgniteSql sql, Executor
asyncContinuationExecutor) {
+ this.sql = sql;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
+ }
+
+ @Override
+ public Statement createStatement(String query) {
+ return sql.createStatement(query);
+ }
+
+ @Override
+ public StatementBuilder statementBuilder() {
+ return sql.statementBuilder();
+ }
+
+ @Override
+ public ResultSet<SqlRow> execute(@Nullable Transaction transaction, String
query, @Nullable Object... arguments) {
+ return sql.execute(transaction, query, arguments);
+ }
+
+ @Override
+ public ResultSet<SqlRow> execute(@Nullable Transaction transaction,
Statement statement, @Nullable Object... arguments) {
+ return sql.execute(transaction, statement, arguments);
+ }
+
+ @Override
+ public <T> ResultSet<T> execute(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return sql.execute(transaction, mapper, query, arguments);
+ }
+
+ @Override
+ public <T> ResultSet<T> execute(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return sql.execute(transaction, mapper, statement, arguments);
+ }
+
+ @Override
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return preventThreadHijackForResultSet(sql.executeAsync(transaction,
query, arguments));
+ }
+
+ @Override
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return preventThreadHijackForResultSet(sql.executeAsync(transaction,
statement, arguments));
+ }
+
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ String query,
+ @Nullable Object... arguments
+ ) {
+ return preventThreadHijackForResultSet(sql.executeAsync(transaction,
mapper, query, arguments));
+ }
+
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ Statement statement,
+ @Nullable Object... arguments
+ ) {
+ return preventThreadHijackForResultSet(sql.executeAsync(transaction,
mapper, statement, arguments));
+ }
+
+ @Override
+ public ReactiveResultSet executeReactive(@Nullable Transaction
transaction, String query, @Nullable Object... arguments) {
+ return sql.executeReactive(transaction, query, arguments);
+ }
+
+ @Override
+ public ReactiveResultSet executeReactive(@Nullable Transaction
transaction, Statement statement, @Nullable Object... arguments) {
+ return sql.executeReactive(transaction, statement, arguments);
+ }
+
+ @Override
+ public long[] executeBatch(@Nullable Transaction transaction, String
dmlQuery, BatchedArguments batch) {
+ return sql.executeBatch(transaction, dmlQuery, batch);
+ }
+
+ @Override
+ public long[] executeBatch(@Nullable Transaction transaction, Statement
dmlStatement, BatchedArguments batch) {
+ return sql.executeBatch(transaction, dmlStatement, batch);
+ }
+
+ @Override
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, String query, BatchedArguments batch) {
+ return preventThreadHijack(sql.executeBatchAsync(transaction, query,
batch));
+ }
+
+ @Override
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
+ return preventThreadHijack(sql.executeBatchAsync(transaction,
statement, batch));
+ }
+
+ @Override
+ public Publisher<Long> executeBatchReactive(@Nullable Transaction
transaction, String query, BatchedArguments batch) {
+ return sql.executeBatchReactive(transaction, query, batch);
+ }
+
+ @Override
+ public Publisher<Long> executeBatchReactive(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
+ return sql.executeBatchReactive(transaction, statement, batch);
+ }
+
+ @Override
+ public void executeScript(String query, @Nullable Object... arguments) {
+ sql.executeScript(query, arguments);
+ }
+
+ @Override
+ public CompletableFuture<Void> executeScriptAsync(String query, @Nullable
Object... arguments) {
+ return preventThreadHijack(sql.executeScriptAsync(query, arguments));
+ }
+
+ private <T> CompletableFuture<AsyncResultSet<T>>
preventThreadHijackForResultSet(CompletableFuture<AsyncResultSet<T>>
originalFuture) {
+ return preventThreadHijack(originalFuture)
+ .thenApply(resultSet -> new
AntiHijackAsyncResultSet<>(resultSet, asyncContinuationExecutor));
+ }
+
+ private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T>
originalFuture) {
+ return PublicApiThreading.preventThreadHijack(originalFuture,
asyncContinuationExecutor);
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> classToUnwrap) {
+ return classToUnwrap.cast(sql);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index b88c948ccf..69bd26fc9b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -417,6 +417,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
/** {@inheritDoc} */
@Override
public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
+ // TODO: IGNITE-21872 - implement.
throw new UnsupportedOperationException("Not implemented yet.");
}