sashapolo commented on code in PR #3378:
URL: https://github.com/apache/ignite-3/pull/3378#discussion_r1517483786
##########
buildscripts/java-integration-test.gradle:
##########
@@ -30,6 +30,9 @@ testing {
implementation libs.junit5.api
implementation libs.junit5.impl
implementation libs.junit5.params
+
+ implementation libs.junit.pioneer
Review Comment:
What's the point of putting empty lines around this dependency? No other
dependency does that
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java:
##########
@@ -566,7 +567,17 @@ private void processOperation(ChannelHandlerContext ctx,
ClientMessageUnpacker i
// Observable timestamp should be calculated after the operation
is processed; reserve space, write later.
int observableTimestampIdx = out.reserveLong();
- CompletableFuture fut = processOperation(in, out, opCode,
requestId);
+ CompletableFuture fut;
+
+ // Enclosing in 'internal call' to save resubmission to the async
continuation thread pool on return. This will only
+ // work if the corresponding call (like an async KeyValueView
method) is invoked in this same thread, but in most cases this
+ // will be true.
+ PublicApiThreading.startInternalCall();
Review Comment:
Can we use `doInternalCall` here?
##########
buildscripts/java-junit5.gradle:
##########
@@ -35,6 +35,8 @@ dependencies {
testImplementation libs.junit5.api
testImplementation libs.junit5.params
+ testImplementation libs.junit.pioneer
Review Comment:
same question
##########
modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java:
##########
@@ -99,19 +115,40 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
* @param <T> Future result type.
* @return Future result.
*/
- protected final <T> T sync(CompletableFuture<T> fut) {
+ protected final <T> T sync(Supplier<CompletableFuture<T>> fut) {
+ // Enclose in 'internal call' to prevent resubmission to the async
continuation pool on future completion
+ // as such a resubmission is useless in the sync case and will just
increase latency.
+ PublicApiThreading.startInternalCall();
Review Comment:
Can we use `doInternalCall`?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java:
##########
@@ -155,8 +192,34 @@ private <T> CompletableFuture<T> withSchemaSync(@Nullable
Transaction tx, @Nulla
}
}))
.thenCompose(identity());
+ }
- return convertToPublicFuture(future);
+ /**
+ * Prevents Ignite internal threads from being hijacked by the user code.
If that happened, the user code could have blocked
+ * Ignite threads deteriorating progress.
+ *
+ * <p>This is done by completing the future in the async continuation
thread pool if it would have been completed in an Ignite thread.
+ * This does not happen for synchronous operations as it's impossible to
hijack a thread using such operations.
+ *
+ * <p>The switch to the async continuation pool is also skipped when it's
known that the call is made by other Ignite component
+ * and not by the user.
+ *
+ * @param originalFuture Operation future.
+ * @return Future that will be completed in the async continuation thread
pool ({@link ForkJoinPool#commonPool()} by default).
+ */
+ protected final <T> CompletableFuture<T>
preventThreadHijack(CompletableFuture<T> originalFuture) {
+ if (originalFuture.isDone() || PublicApiThreading.inInternalCall()) {
+ return originalFuture;
+ }
+
+ // The future is not complete yet, so it will be completed on an
Ignite thread, so we need to complete the user-facing future
+ // in the continuation pool.
+
+ CompletableFuture<T> resultingFuture = new CompletableFuture<>();
+
+ originalFuture.whenComplete((res, ex) ->
asyncContinuationExecutor.execute(() -> completeOrFailFuture(resultingFuture,
res, ex)));
Review Comment:
Do we need to use `execute` here? Will `whenCompleteAsync` work as well?
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.threading;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.streamer.SimplePublisher;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
+
+@WithSystemProperty(key = IgniteSystemProperties.THREAD_ASSERTIONS_ENABLED,
value = "false")
+@SuppressWarnings("resource")
+class ItKvRecordApiThreadingTest extends ClusterPerClassIntegrationTest {
+ private static final String TABLE_NAME = "test";
+ private static final int KEY = 1;
+
+ private static final Record KEY_RECORD = new Record(1, "");
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @BeforeAll
+ void createTable() {
+ try (Session session = firstNode().sql().createSession()) {
Review Comment:
I'm pretty sure there's a method that already creates a session for you,
`sql` or `executeSql` or something
##########
modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import java.util.function.Supplier;
+
+/**
+ * Logic related to the threading concern of Public APIs.
Review Comment:
Please add more context to this javadoc, it's absolutely not clear, what
this class does
##########
modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java:
##########
@@ -81,15 +88,24 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
* @param schemaReg Schema registry.
* @param sql Ignite SQL facade.
* @param marshallers Marshallers provider.
+ * @param asyncContinuationExecutor Executor to which execution will be
resubmitted when leaving asynchronous public API endpoints
+ * (so as to prevent the user from stealing Ignite threads).
Review Comment:
```suggestion
* (to prevent the user from stealing Ignite threads).
```
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.threading;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.streamer.SimplePublisher;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
+
+@WithSystemProperty(key = IgniteSystemProperties.THREAD_ASSERTIONS_ENABLED,
value = "false")
+@SuppressWarnings("resource")
+class ItKvRecordApiThreadingTest extends ClusterPerClassIntegrationTest {
+ private static final String TABLE_NAME = "test";
+ private static final int KEY = 1;
+
+ private static final Record KEY_RECORD = new Record(1, "");
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @BeforeAll
+ void createTable() {
+ try (Session session = firstNode().sql().createSession()) {
+ session.execute(null, "CREATE TABLE " + TABLE_NAME + " (id INT
PRIMARY KEY, val VARCHAR)");
+ }
+ }
+
+ private static IgniteImpl firstNode() {
Review Comment:
Why do you need this method?
##########
modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java:
##########
@@ -126,6 +127,11 @@ public static Throwable mapToPublicException(Throwable
origin) {
* @return New CompletableFuture.
*/
public static <T> CompletableFuture<T>
convertToPublicFuture(CompletableFuture<T> origin) {
+ if (isCompletedSuccessfully(origin)) {
+ // No need to translate exceptions.
Review Comment:
Not related to this PR, but I think it's more correct to use `exceptionally`
below
##########
modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java:
##########
@@ -132,4 +132,19 @@ public static <T> CompletableFuture<T>
completedOrFailedFuture(@Nullable T resul
return completedFuture(result);
}
}
+
+ /**
+ * Completes a future either with an exception (if it's not {@code null}
or normally.
+ *
+ * @param future Future to complete.
+ * @param result Normal completion result. Only used if the exception is
{@code null}.
+ * @param ex Exception. If not {@code null}, the future will be failed
with it, otherwise it will be completed normally.
+ */
+ public static <T> void completeOrFailFuture(CompletableFuture<T> future,
@Nullable T result, @Nullable Throwable ex) {
Review Comment:
There exists an `IginiteUtils#copyStateTo` method. Can it replace this one?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]