This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d68ae13df7 IGNITE-23446 Fix false-positive hasNext flag returned from 
sql cursor (#4662)
d68ae13df7 is described below

commit d68ae13df7e898139bb38c92d4f4334668fa98d3
Author: korlov42 <[email protected]>
AuthorDate: Tue Nov 5 11:26:29 2024 +0200

    IGNITE-23446 Fix false-positive hasNext flag returned from sql cursor 
(#4662)
---
 .../sql/engine/exec/rel/AsyncRootNode.java         |  26 ++++-
 .../sql/engine/exec/rel/AsyncRootNodeTest.java     | 122 +++++++++++++++++++++
 2 files changed, 143 insertions(+), 5 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index 26e5fb7bfd..c8f7322be1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -200,6 +200,7 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
 
         if (waiting == 0) {
             try {
+                //noinspection NestedAssignment
                 source.request(waiting = IN_BUFFER_SIZE);
             } catch (Exception ex) {
                 onError(ex);
@@ -230,19 +231,30 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
             currentReq.buff.add(buff.remove());
         }
 
-        boolean hasMoreRows = waiting != -1 || !buff.isEmpty();
+        HasMore hasMore;
+        if (waiting == -1 && buff.isEmpty()) {
+            hasMore = HasMore.NO;
+        } else if (!buff.isEmpty()) {
+            hasMore = HasMore.YES;
+        } else {
+            hasMore = HasMore.UNCERTAIN;
+        }
 
-        if (currentReq.buff.size() == currentReq.requested || !hasMoreRows) {
+        // Even if demand is fulfilled we should not complete request
+        // if we are not sure whether there are more rows or not to
+        // avoid returning false-positive result.
+        if ((currentReq.buff.size() == currentReq.requested && hasMore != 
HasMore.UNCERTAIN) || hasMore == HasMore.NO) {
             // use poll() instead of remove() because latter throws exception 
when queue is empty,
             // and queue may be cleared concurrently by cancellation
             pendingRequests.poll();
 
-            currentReq.fut.complete(new BatchedResult<>(currentReq.buff, 
hasMoreRows));
+            currentReq.fut.complete(new BatchedResult<>(currentReq.buff, 
hasMore == HasMore.YES));
         }
 
-        if (waiting == 0) {
+        if (waiting == 0 && buff.isEmpty()) {
+            //noinspection NestedAssignment
             source.request(waiting = IN_BUFFER_SIZE);
-        } else if (!hasMoreRows) {
+        } else if (hasMore == HasMore.NO) {
             closeAsync();
         }
     }
@@ -293,4 +305,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
             this.buff = new ArrayList<>(requested);
         }
     }
+
+    private enum HasMore {
+        YES, NO, UNCERTAIN
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
index d66ee0bc86..d15c10ad59 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
@@ -20,25 +20,40 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.lang.InternalTuple;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ScannableDataSource;
 import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
 import org.junit.jupiter.api.Test;
 
 /**
  * Tests to verify {@link AsyncRootNode}.
  */
+@SuppressWarnings("NumericCastThatLosesPrecision")
 class AsyncRootNodeTest extends AbstractExecutionTest<RowWrapper> {
     private static final RowSchema SINGLE_INT_ROW_SCHEMA = RowSchema.builder()
             .addField(NativeTypes.INT32)
@@ -78,8 +93,115 @@ class AsyncRootNodeTest extends 
AbstractExecutionTest<RowWrapper> {
         assertFalse(prefetchFuture.isDone());
     }
 
+    /**
+     * Test to make sure root node won't return false-positive result in 
{@link BatchedResult#hasMore()}.
+     *
+     * <p>Such problem may arise when incoming request drains the internal 
buffer of the node empty, while source node has no more data
+     * left yet {@link Downstream#end()} has not been called.
+     *
+     * <p>Test below is a simplified reproducer: data source has exactly the 
same number of rows that the size of the buffer.
+     *
+     * <p>Another scenario may involve one exchange: one remote should fulfill 
the demand with all the rows it has, while another remote
+     * doesn't have data at all, but batch message from that remote is delayed.
+     */
+    @Test
+    void ensureNodeWontReturnFalsePositiveHasMoreFlag() {
+        ExecutionContext<RowWrapper> context = executionContext();
+        TestDataSource dataSource = new TestDataSource();
+
+        DataSourceScanNode<RowWrapper> dataSourceScanNode = new 
DataSourceScanNode<>(
+                context,
+                rowHandler().factory(SINGLE_INT_ROW_SCHEMA),
+                SINGLE_INT_SCHEMA,
+                dataSource,
+                null,
+                null,
+                null
+        );
+
+        AsyncRootNode<RowWrapper, RowWrapper> rootNode = new 
AsyncRootNode<>(dataSourceScanNode, Function.identity());
+        dataSourceScanNode.onRegister(rootNode);
+
+        // trigger prefetch
+        await(context.submit(rootNode::startPrefetch, err -> {}));
+
+        // wait for datasource to emit rows
+        await(dataSource.demandFulfilled);
+
+        int requested = (int) dataSource.wasRequested.get();
+
+        // request the same amount to empty the buffer of RootNode
+        CompletableFuture<BatchedResult<RowWrapper>> result = 
rootNode.requestNextAsync(requested);
+
+        try {
+            result.get(1, TimeUnit.SECONDS);
+
+            fail("Future should not be completed, because it must wait for 
TestDataSource#endDelay to be triggered");
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (TimeoutException ignored) {
+            // this is expected
+        }
+
+        dataSource.endDelay.complete(null);
+
+        BatchedResult<?> batch = await(result);
+        assertFalse(batch.hasMore());
+    }
+
     @Override
     protected RowHandler<RowWrapper> rowHandler() {
         return SqlRowHandler.INSTANCE;
     }
+
+    private static class TestDataSource implements ScannableDataSource {
+        final AtomicLong wasRequested = new AtomicLong();
+        final CompletableFuture<Void> endDelay = new CompletableFuture<>();
+        final CompletableFuture<Void> demandFulfilled = new 
CompletableFuture<>();
+
+        @Override
+        public Publisher<InternalTuple> scan() {
+            return subscriber -> {
+                Subscription subscription = new TestSubscription(subscriber);
+
+                subscriber.onSubscribe(subscription);
+            };
+        }
+
+        private class TestSubscription implements Subscription {
+            private final AtomicBoolean requested = new AtomicBoolean();
+
+            private final Subscriber<? super InternalTuple> subscriber;
+
+            private TestSubscription(Subscriber<? super InternalTuple> 
subscriber) {
+                this.subscriber = subscriber;
+            }
+
+            @Override
+            public void request(long n) {
+                if (requested.compareAndSet(false, true)) {
+                    wasRequested.set(n);
+
+                    IntStream.range(0, (int) n)
+                            .mapToObj(AsyncRootNodeTest::createTuple)
+                            .forEach(subscriber::onNext);
+
+                    demandFulfilled.complete(null);
+                } else {
+                    endDelay.thenRun(subscriber::onComplete);
+                }
+            }
+
+            @Override
+            public void cancel() {
+            }
+        }
+    }
+
+    private static InternalTuple createTuple(int value) {
+        return new BinaryTuple(1, new BinaryTupleBuilder(1, 4)
+                .appendInt(value)
+                .build()
+        );
+    }
 }

Reply via email to