This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d68ae13df7 IGNITE-23446 Fix false-positive hasNext flag returned from
sql cursor (#4662)
d68ae13df7 is described below
commit d68ae13df7e898139bb38c92d4f4334668fa98d3
Author: korlov42 <[email protected]>
AuthorDate: Tue Nov 5 11:26:29 2024 +0200
IGNITE-23446 Fix false-positive hasNext flag returned from sql cursor
(#4662)
---
.../sql/engine/exec/rel/AsyncRootNode.java | 26 ++++-
.../sql/engine/exec/rel/AsyncRootNodeTest.java | 122 +++++++++++++++++++++
2 files changed, 143 insertions(+), 5 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index 26e5fb7bfd..c8f7322be1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -200,6 +200,7 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
if (waiting == 0) {
try {
+ //noinspection NestedAssignment
source.request(waiting = IN_BUFFER_SIZE);
} catch (Exception ex) {
onError(ex);
@@ -230,19 +231,30 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
currentReq.buff.add(buff.remove());
}
- boolean hasMoreRows = waiting != -1 || !buff.isEmpty();
+ HasMore hasMore;
+ if (waiting == -1 && buff.isEmpty()) {
+ hasMore = HasMore.NO;
+ } else if (!buff.isEmpty()) {
+ hasMore = HasMore.YES;
+ } else {
+ hasMore = HasMore.UNCERTAIN;
+ }
- if (currentReq.buff.size() == currentReq.requested || !hasMoreRows) {
+ // Even if demand is fulfilled we should not complete request
+ // if we are not sure whether there are more rows or not to
+ // avoid returning false-positive result.
+ if ((currentReq.buff.size() == currentReq.requested && hasMore !=
HasMore.UNCERTAIN) || hasMore == HasMore.NO) {
// use poll() instead of remove() because latter throws exception
when queue is empty,
// and queue may be cleared concurrently by cancellation
pendingRequests.poll();
- currentReq.fut.complete(new BatchedResult<>(currentReq.buff,
hasMoreRows));
+ currentReq.fut.complete(new BatchedResult<>(currentReq.buff,
hasMore == HasMore.YES));
}
- if (waiting == 0) {
+ if (waiting == 0 && buff.isEmpty()) {
+ //noinspection NestedAssignment
source.request(waiting = IN_BUFFER_SIZE);
- } else if (!hasMoreRows) {
+ } else if (hasMore == HasMore.NO) {
closeAsync();
}
}
@@ -293,4 +305,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
this.buff = new ArrayList<>(requested);
}
}
+
+ private enum HasMore {
+ YES, NO, UNCERTAIN
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
index d66ee0bc86..d15c10ad59 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
@@ -20,25 +20,40 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.lang.InternalTuple;
+import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ScannableDataSource;
import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.junit.jupiter.api.Test;
/**
* Tests to verify {@link AsyncRootNode}.
*/
+@SuppressWarnings("NumericCastThatLosesPrecision")
class AsyncRootNodeTest extends AbstractExecutionTest<RowWrapper> {
private static final RowSchema SINGLE_INT_ROW_SCHEMA = RowSchema.builder()
.addField(NativeTypes.INT32)
@@ -78,8 +93,115 @@ class AsyncRootNodeTest extends
AbstractExecutionTest<RowWrapper> {
assertFalse(prefetchFuture.isDone());
}
+ /**
+ * Test to make sure root node won't return false-positive result in
{@link BatchedResult#hasMore()}.
+ *
+ * <p>Such problem may arise when incoming request drains the internal
buffer of the node empty, while source node has no more data
+ * left yet {@link Downstream#end()} has not been called.
+ *
+ * <p>Test below is a simplified reproducer: data source has exactly the
same number of rows that the size of the buffer.
+ *
+ * <p>Another scenario may involve one exchange: one remote should fulfill
the demand with all the rows it has, while another remote
+ * doesn't have data at all, but batch message from that remote is delayed.
+ */
+ @Test
+ void ensureNodeWontReturnFalsePositiveHasMoreFlag() {
+ ExecutionContext<RowWrapper> context = executionContext();
+ TestDataSource dataSource = new TestDataSource();
+
+ DataSourceScanNode<RowWrapper> dataSourceScanNode = new
DataSourceScanNode<>(
+ context,
+ rowHandler().factory(SINGLE_INT_ROW_SCHEMA),
+ SINGLE_INT_SCHEMA,
+ dataSource,
+ null,
+ null,
+ null
+ );
+
+ AsyncRootNode<RowWrapper, RowWrapper> rootNode = new
AsyncRootNode<>(dataSourceScanNode, Function.identity());
+ dataSourceScanNode.onRegister(rootNode);
+
+ // trigger prefetch
+ await(context.submit(rootNode::startPrefetch, err -> {}));
+
+ // wait for datasource to emit rows
+ await(dataSource.demandFulfilled);
+
+ int requested = (int) dataSource.wasRequested.get();
+
+ // request the same amount to empty the buffer of RootNode
+ CompletableFuture<BatchedResult<RowWrapper>> result =
rootNode.requestNextAsync(requested);
+
+ try {
+ result.get(1, TimeUnit.SECONDS);
+
+ fail("Future should not be completed, because it must wait for
TestDataSource#endDelay to be triggered");
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (TimeoutException ignored) {
+ // this is expected
+ }
+
+ dataSource.endDelay.complete(null);
+
+ BatchedResult<?> batch = await(result);
+ assertFalse(batch.hasMore());
+ }
+
@Override
protected RowHandler<RowWrapper> rowHandler() {
return SqlRowHandler.INSTANCE;
}
+
+ private static class TestDataSource implements ScannableDataSource {
+ final AtomicLong wasRequested = new AtomicLong();
+ final CompletableFuture<Void> endDelay = new CompletableFuture<>();
+ final CompletableFuture<Void> demandFulfilled = new
CompletableFuture<>();
+
+ @Override
+ public Publisher<InternalTuple> scan() {
+ return subscriber -> {
+ Subscription subscription = new TestSubscription(subscriber);
+
+ subscriber.onSubscribe(subscription);
+ };
+ }
+
+ private class TestSubscription implements Subscription {
+ private final AtomicBoolean requested = new AtomicBoolean();
+
+ private final Subscriber<? super InternalTuple> subscriber;
+
+ private TestSubscription(Subscriber<? super InternalTuple>
subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public void request(long n) {
+ if (requested.compareAndSet(false, true)) {
+ wasRequested.set(n);
+
+ IntStream.range(0, (int) n)
+ .mapToObj(AsyncRootNodeTest::createTuple)
+ .forEach(subscriber::onNext);
+
+ demandFulfilled.complete(null);
+ } else {
+ endDelay.thenRun(subscriber::onComplete);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }
+ }
+
+ private static InternalTuple createTuple(int value) {
+ return new BinaryTuple(1, new BinaryTupleBuilder(1, 4)
+ .appendInt(value)
+ .build()
+ );
+ }
}