This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 87792091f1 IGNITE-20454 Sql. Added a callback that is notified when data prefetching is complete (#2674) 87792091f1 is described below commit 87792091f1fef453c191dd93e5ea24a47f82dfea Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Thu Oct 19 13:54:44 2023 +0300 IGNITE-20454 Sql. Added a callback that is notified when data prefetching is complete (#2674) --- .../internal/sql/engine/QueryPrefetchCallback.java | 46 ++++++++ .../sql/engine/exec/ExecutionServiceImpl.java | 24 +++- .../sql/engine/exec/rel/AsyncRootNode.java | 28 ++++- .../internal/sql/engine/util/BaseQueryContext.java | 20 +++- .../sql/engine/exec/ExecutionServiceImplTest.java | 123 ++++++++++++++++++++- 5 files changed, 230 insertions(+), 11 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryPrefetchCallback.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryPrefetchCallback.java new file mode 100644 index 0000000000..7ef6a7604c --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryPrefetchCallback.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine; + +import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; +import org.apache.ignite.internal.sql.engine.exec.rel.AsyncRootNode; +import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlCommand; +import org.jetbrains.annotations.Nullable; + +/** + * Callback that is invoked when the query finishes prefetching. It is designed + * to allow sequential execution of SQL statements that are dependent on each other. + * + * <ol> + * <li>For {@code DML} queries, it is called after the cursor has finished prefetching + * the initial batch of rows (see {@link AsyncRootNode#startPrefetch}).</li> + * <li>For {@code DDL} queries, it is called after the corresponding DDL + * command has completed (see {@link DdlCommandHandler#handle(DdlCommand)}.</li> + * </ol> + * + * <p>This callback is invoked asynchronously in the "{@code execution pool}". + */ +@FunctionalInterface +public interface QueryPrefetchCallback { + /** + * Called when the query finishes prefetching. + * + * @param ex Exceptional completion cause, or {@code null} if prefetch completed successfully. + */ + void onPrefetchComplete(@Nullable Throwable ex); +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index aa4c88cb73..ca0be42a80 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.sql.engine.NodeLeftException; import org.apache.ignite.internal.sql.engine.QueryCancelledException; +import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback; import org.apache.ignite.internal.sql.engine.SqlQueryType; import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; import org.apache.ignite.internal.sql.engine.exec.mapping.FragmentDescription; @@ -268,9 +269,9 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve case QUERY: return executeQuery(tx, ctx, (MultiStepPlan) plan); case EXPLAIN: - return executeExplain((ExplainPlan) plan); + return executeExplain((ExplainPlan) plan, ctx.prefetchCallback()); case DDL: - return executeDdl((DdlPlan) plan); + return executeDdl((DdlPlan) plan, ctx.prefetchCallback()); default: throw new AssertionError("Unexpected query type: " + plan); @@ -288,13 +289,17 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve return mgr.close(true); } - private AsyncCursor<List<Object>> executeDdl(DdlPlan plan) { + private AsyncCursor<List<Object>> executeDdl(DdlPlan plan, @Nullable QueryPrefetchCallback callback) { CompletableFuture<Iterator<List<Object>>> ret = ddlCmdHnd.handle(plan.command()) .thenApply(applied -> List.of(List.<Object>of(applied)).iterator()) .exceptionally(th -> { throw convertDdlException(th); }); + if (callback != null) { + ret.whenCompleteAsync((res, err) -> callback.onPrefetchComplete(err), taskExecutor); + } + return new AsyncWrapper<>(ret, Runnable::run); } @@ -315,9 +320,13 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve return (e instanceof RuntimeException) ? (RuntimeException) e : new IgniteInternalException(INTERNAL_ERR, e); } - private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan) { + private AsyncCursor<List<Object>> executeExplain(ExplainPlan plan, @Nullable QueryPrefetchCallback callback) { List<List<Object>> res = List.of(List.of(plan.plan())); + if (callback != null) { + taskExecutor.execute(() -> callback.onPrefetchComplete(null)); + } + return new AsyncWrapper<>(res.iterator()); } @@ -555,7 +564,12 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, TopologyEve }); node.onRegister(rootNode); - rootNode.prefetch(); + CompletableFuture<Void> prefetchFut = rootNode.startPrefetch(); + QueryPrefetchCallback callback = ctx.prefetchCallback(); + + if (callback != null) { + prefetchFut.whenCompleteAsync((res, err) -> callback.onPrefetchComplete(err), taskExecutor); + } root.complete(rootNode); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java index d21c541c79..b1350a8227 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java @@ -31,6 +31,7 @@ import java.util.function.Function; import org.apache.ignite.internal.sql.engine.QueryCancelledException; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.sql.CursorClosedException; +import org.jetbrains.annotations.Nullable; /** * An async iterator over the execution tree. @@ -56,6 +57,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async private final Queue<PendingRequest<OutRowT>> pendingRequests = new ConcurrentLinkedQueue<>(); + private final CompletableFuture<Void> prefetchFut = new CompletableFuture<>(); + private volatile boolean closed = false; /** @@ -169,6 +172,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async } }, source::onError); + completePrefetchFuture(th); + closed = true; } } @@ -181,8 +186,10 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async * Starts the execution of the fragment and keeps the result in the intermediate buffer. * * <p>Note: this method must be called by the same thread that will execute the whole fragment. + * + * @return Future representing pending completion of the prefetch operation. */ - public void prefetch() { + public CompletableFuture<Void> startPrefetch() { assert source.context().description().prefetch(); if (waiting == 0) { @@ -192,9 +199,13 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async onError(ex); } } + + return prefetchFut; } private void flush() throws Exception { + completePrefetchFuture(null); + // flush may be triggered by prefetching, so let's do nothing in this case if (pendingRequests.isEmpty()) { return; @@ -234,6 +245,21 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async } } + /** + * Completes prefetch future if it has not already been completed. + * + * @param ex Exceptional completion cause or {@code null} if the future must complete successfully. + */ + private void completePrefetchFuture(@Nullable Throwable ex) { + if (!prefetchFut.isDone()) { + if (ex != null) { + prefetchFut.completeExceptionally(ex); + } else { + prefetchFut.complete(null); + } + } + } + private static class PendingRequest<OutRowT> { /** * A future to complete when {@link #buff buffer} will be filled. diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java index b6f46a7e3b..7c2d517c85 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java @@ -50,6 +50,7 @@ import org.apache.calcite.tools.Frameworks; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.sql.engine.QueryCancel; +import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback; import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory; import org.apache.ignite.internal.sql.engine.rex.IgniteRexBuilder; import org.apache.ignite.internal.sql.engine.schema.IgniteSchema; @@ -152,6 +153,8 @@ public final class BaseQueryContext extends AbstractQueryContext { private final Object[] parameters; + private final QueryPrefetchCallback prefetchCallback; + private CalciteCatalogReader catalogReader; /** @@ -162,7 +165,8 @@ public final class BaseQueryContext extends AbstractQueryContext { FrameworkConfig cfg, QueryCancel cancel, Object[] parameters, - IgniteLogger log + IgniteLogger log, + QueryPrefetchCallback prefetchCallback ) { super(Contexts.chain(cfg.getContext())); @@ -173,6 +177,7 @@ public final class BaseQueryContext extends AbstractQueryContext { this.log = log; this.cancel = cancel; this.parameters = parameters; + this.prefetchCallback = prefetchCallback; typeFactory = TYPE_FACTORY; @@ -225,6 +230,10 @@ public final class BaseQueryContext extends AbstractQueryContext { return Objects.requireNonNull(schema().unwrap(IgniteSchema.class)).version(); } + public QueryPrefetchCallback prefetchCallback() { + return prefetchCallback; + } + /** * Returns calcite catalog reader. */ @@ -285,6 +294,8 @@ public final class BaseQueryContext extends AbstractQueryContext { private Object[] parameters = ArrayUtils.OBJECT_EMPTY_ARRAY; + private QueryPrefetchCallback prefetchCallback; + public Builder frameworkConfig(FrameworkConfig frameworkCfg) { this.frameworkCfg = Objects.requireNonNull(frameworkCfg); return this; @@ -305,13 +316,18 @@ public final class BaseQueryContext extends AbstractQueryContext { return this; } + public Builder prefetchCallback(QueryPrefetchCallback prefetchCallback) { + this.prefetchCallback = prefetchCallback; + return this; + } + public Builder parameters(Object... parameters) { this.parameters = Objects.requireNonNull(parameters); return this; } public BaseQueryContext build() { - return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log); + return new BaseQueryContext(queryId, frameworkCfg, cancel, parameters, log, prefetchCallback); } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java index 5ade4b6ffd..fb1524a9b3 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java @@ -22,12 +22,15 @@ import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn; import static org.apache.ignite.lang.ErrorGroups.Common.NODE_LEFT_ERR; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -42,13 +45,14 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; @@ -57,7 +61,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.Frameworks; @@ -68,6 +75,7 @@ import org.apache.ignite.internal.metrics.MetricManager; import org.apache.ignite.internal.sql.engine.NodeLeftException; import org.apache.ignite.internal.sql.engine.QueryCancel; import org.apache.ignite.internal.sql.engine.QueryCancelledException; +import org.apache.ignite.internal.sql.engine.QueryPrefetchCallback; import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImplTest.TestCluster.TestNode; import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler; import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget; @@ -579,6 +587,115 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { assertThat(stopFuture, willSucceedIn(10, TimeUnit.SECONDS)); } + /** + * Tests the ability to run multiple statements using {@link QueryPrefetchCallback}. Each subsequent + * statement begins execution after the prefetching for the previous statement is completed. + * + * @throws Exception If failed. + */ + @Test + public void testPrefetchCallbackInvocation() throws Exception { + String query = "SELECT * FROM test_tbl"; + int totalStatements = 20; + Collection<AsyncCursor<List<Object>>> resultCursors = new ArrayBlockingQueue<>(totalStatements); + List<String> queries = IntStream.range(0, totalStatements).boxed().map(n -> query).collect(Collectors.toList()); + ArrayBlockingQueue<String> queriesQueue = new ArrayBlockingQueue<>(totalStatements, false, queries); + AtomicReference<AssertionError> errHolder = new AtomicReference<>(); + ExecutionService execService = executionServices.get(0); + + Function<QueryPrefetchCallback, BaseQueryContext> createCtx = (callback) -> BaseQueryContext.builder() + .cancel(new QueryCancel()) + .prefetchCallback(callback) + .frameworkConfig( + Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) + .defaultSchema(wrap(schema)) + .build() + ) + .logger(log) + .build(); + + QueryPrefetchCallback prefetchListener = new QueryPrefetchCallback() { + @Override + public void onPrefetchComplete(@Nullable Throwable err) { + try { + assertThat(err, nullValue()); + + String sql = queriesQueue.poll(); + + assertThat(sql, notNullValue()); + + BaseQueryContext ctx = createCtx.apply(queriesQueue.isEmpty() ? null : this); + InternalTransaction tx = new NoOpTransaction(nodeNames.get(0)); + QueryPlan plan = prepare(sql, ctx); + + resultCursors.add( + execService.executePlan(tx, plan, ctx) + ); + } catch (AssertionError e) { + errHolder.set(e); + } catch (Throwable t) { + errHolder.set(new AssertionError(t)); + } + } + }; + + // Start statements execution. + prefetchListener.onPrefetchComplete(null); + + waitForCondition(() -> resultCursors.size() == queries.size(), TIMEOUT_IN_MS); + + if (errHolder.get() != null) { + throw errHolder.get(); + } + + assertEquals(queries.size(), resultCursors.size()); + + resultCursors.forEach(AsyncCursor::closeAsync); + } + + /** + * Test ensures that an exception during data prefetching is propagated to the callback. + */ + @Test + public void testErrorIsPropagatedToPrefetchCallback() { + ExecutionService execService = executionServices.get(0); + CompletableFuture<Void> prefetchFut = new CompletableFuture<>(); + IgniteInternalException expectedException = new IgniteInternalException(Common.INTERNAL_ERR, "Expected exception"); + + BaseQueryContext ctx = BaseQueryContext.builder() + .cancel(new QueryCancel()) + .prefetchCallback(prefetchFut::completeExceptionally) + .frameworkConfig( + Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) + .defaultSchema(wrap(schema)) + .build() + ) + .logger(log) + .build(); + + testCluster.node(nodeNames.get(2)).interceptor((nodeName, msg, original) -> { + if (msg instanceof QueryStartRequest) { + testCluster.node(nodeNames.get(2)).messageService().send(nodeName, new SqlQueryMessagesFactory().queryStartResponse() + .queryId(((QueryStartRequest) msg).queryId()) + .fragmentId(((QueryStartRequest) msg).fragmentId()) + .error(expectedException) + .build() + ); + } else { + original.onMessage(nodeName, msg); + } + + return CompletableFuture.completedFuture(null); + }); + + QueryPlan plan = prepare("SELECT * FROM test_tbl", ctx); + AsyncCursor<List<Object>> cursor = execService.executePlan(new NoOpTransaction(nodeNames.get(0)), plan, ctx); + + assertThat(prefetchFut, willThrow(equalTo(expectedException))); + + cursor.closeAsync(); + } + /** Creates an execution service instance for the node with given consistent id. */ public ExecutionServiceImpl<Object[]> create(String nodeName) { if (!nodeNames.contains(nodeName)) { @@ -918,8 +1035,8 @@ public class ExecutionServiceImplTest extends BaseIgniteAbstractTest { private static class CapturingMailboxRegistry implements MailboxRegistry { private final MailboxRegistry delegate; - private final Set<Inbox<?>> inboxes = Collections.newSetFromMap(new IdentityHashMap<>()); - private final Set<Outbox<?>> outboxes = Collections.newSetFromMap(new IdentityHashMap<>()); + private final Set<Inbox<?>> inboxes = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set<Outbox<?>> outboxes = Collections.newSetFromMap(new ConcurrentHashMap<>()); CapturingMailboxRegistry(MailboxRegistry delegate) { this.delegate = delegate;