sashapolo commented on code in PR #3378:
URL: https://github.com/apache/ignite-3/pull/3378#discussion_r1517483786


##########
buildscripts/java-integration-test.gradle:
##########
@@ -30,6 +30,9 @@ testing {
                 implementation libs.junit5.api
                 implementation libs.junit5.impl
                 implementation libs.junit5.params
+
+                implementation libs.junit.pioneer

Review Comment:
   What's the point of putting empty lines around this dependency? No other 
dependency does that



##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java:
##########
@@ -566,7 +567,17 @@ private void processOperation(ChannelHandlerContext ctx, 
ClientMessageUnpacker i
             // Observable timestamp should be calculated after the operation 
is processed; reserve space, write later.
             int observableTimestampIdx = out.reserveLong();
 
-            CompletableFuture fut = processOperation(in, out, opCode, 
requestId);
+            CompletableFuture fut;
+
+            // Enclosing in 'internal call' to save resubmission to the async 
continuation thread pool on return. This will only
+            // work if the corresponding call (like an async KeyValueView 
method) is invoked in this same thread, but in most cases this
+            // will be true.
+            PublicApiThreading.startInternalCall();

Review Comment:
   Can we use `doInternalCall` here?



##########
buildscripts/java-junit5.gradle:
##########
@@ -35,6 +35,8 @@ dependencies {
     testImplementation libs.junit5.api
     testImplementation libs.junit5.params
 
+    testImplementation libs.junit.pioneer

Review Comment:
   same question



##########
modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java:
##########
@@ -99,19 +115,40 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
      * @param <T> Future result type.
      * @return Future result.
      */
-    protected final <T> T sync(CompletableFuture<T> fut) {
+    protected final <T> T sync(Supplier<CompletableFuture<T>> fut) {
+        // Enclose in 'internal call' to prevent resubmission to the async 
continuation pool on future completion
+        // as such a resubmission is useless in the sync case and will just 
increase latency.
+        PublicApiThreading.startInternalCall();

Review Comment:
   Can we use `doInternalCall`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java:
##########
@@ -155,8 +192,34 @@ private <T> CompletableFuture<T> withSchemaSync(@Nullable 
Transaction tx, @Nulla
                             }
                         }))
                 .thenCompose(identity());
+    }
 
-        return convertToPublicFuture(future);
+    /**
+     * Prevents Ignite internal threads from being hijacked by the user code. 
If that happened, the user code could have blocked
+     * Ignite threads deteriorating progress.
+     *
+     * <p>This is done by completing the future in the async continuation 
thread pool if it would have been completed in an Ignite thread.
+     * This does not happen for synchronous operations as it's impossible to 
hijack a thread using such operations.
+     *
+     * <p>The switch to the async continuation pool is also skipped when it's 
known that the call is made by other Ignite component
+     * and not by the user.
+     *
+     * @param originalFuture Operation future.
+     * @return Future that will be completed in the async continuation thread 
pool ({@link ForkJoinPool#commonPool()} by default).
+     */
+    protected final <T> CompletableFuture<T> 
preventThreadHijack(CompletableFuture<T> originalFuture) {
+        if (originalFuture.isDone() || PublicApiThreading.inInternalCall()) {
+            return originalFuture;
+        }
+
+        // The future is not complete yet, so it will be completed on an 
Ignite thread, so we need to complete the user-facing future
+        // in the continuation pool.
+
+        CompletableFuture<T> resultingFuture = new CompletableFuture<>();
+
+        originalFuture.whenComplete((res, ex) -> 
asyncContinuationExecutor.execute(() -> completeOrFailFuture(resultingFuture, 
res, ex)));

Review Comment:
   Do we need to use `execute` here? Will `whenCompleteAsync` work as well?



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.threading;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.streamer.SimplePublisher;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
+
+@WithSystemProperty(key = IgniteSystemProperties.THREAD_ASSERTIONS_ENABLED, 
value = "false")
+@SuppressWarnings("resource")
+class ItKvRecordApiThreadingTest extends ClusterPerClassIntegrationTest {
+    private static final String TABLE_NAME = "test";
+    private static final int KEY = 1;
+
+    private static final Record KEY_RECORD = new Record(1, "");
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @BeforeAll
+    void createTable() {
+        try (Session session = firstNode().sql().createSession()) {

Review Comment:
   I'm pretty sure there's a method that already creates a session for you, 
`sql` or `executeSql` or something



##########
modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import java.util.function.Supplier;
+
+/**
+ * Logic related to the threading concern of Public APIs.

Review Comment:
   Please add more context to this javadoc, it's absolutely not clear, what 
this class does



##########
modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java:
##########
@@ -81,15 +88,24 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
      * @param schemaReg Schema registry.
      * @param sql Ignite SQL facade.
      * @param marshallers Marshallers provider.
+     * @param asyncContinuationExecutor Executor to which execution will be 
resubmitted when leaving asynchronous public API endpoints
+     *     (so as to prevent the user from stealing Ignite threads).

Review Comment:
   ```suggestion
        *     (to prevent the user from stealing Ignite threads).
   ```



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java:
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.threading;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.streamer.SimplePublisher;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
+
+@WithSystemProperty(key = IgniteSystemProperties.THREAD_ASSERTIONS_ENABLED, 
value = "false")
+@SuppressWarnings("resource")
+class ItKvRecordApiThreadingTest extends ClusterPerClassIntegrationTest {
+    private static final String TABLE_NAME = "test";
+    private static final int KEY = 1;
+
+    private static final Record KEY_RECORD = new Record(1, "");
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @BeforeAll
+    void createTable() {
+        try (Session session = firstNode().sql().createSession()) {
+            session.execute(null, "CREATE TABLE " + TABLE_NAME + " (id INT 
PRIMARY KEY, val VARCHAR)");
+        }
+    }
+
+    private static IgniteImpl firstNode() {

Review Comment:
   Why do you need this method?



##########
modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java:
##########
@@ -126,6 +127,11 @@ public static Throwable mapToPublicException(Throwable 
origin) {
      * @return New CompletableFuture.
      */
     public static <T> CompletableFuture<T> 
convertToPublicFuture(CompletableFuture<T> origin) {
+        if (isCompletedSuccessfully(origin)) {
+            // No need to translate exceptions.

Review Comment:
   Not related to this PR, but I think it's more correct to use `exceptionally` 
below



##########
modules/core/src/main/java/org/apache/ignite/internal/util/CompletableFutures.java:
##########
@@ -132,4 +132,19 @@ public static <T> CompletableFuture<T> 
completedOrFailedFuture(@Nullable T resul
             return completedFuture(result);
         }
     }
+
+    /**
+     * Completes a future either with an exception (if it's not {@code null} 
or normally.
+     *
+     * @param future Future to complete.
+     * @param result Normal completion result. Only used if the exception is 
{@code null}.
+     * @param ex Exception. If not {@code null}, the future will be failed 
with it, otherwise it will be completed normally.
+     */
+    public static <T> void completeOrFailFuture(CompletableFuture<T> future, 
@Nullable T result, @Nullable Throwable ex) {

Review Comment:
   There exists an `IginiteUtils#copyStateTo` method. Can it replace this one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to