xtern commented on code in PR #5143:
URL: https://github.com/apache/ignite-3/pull/5143#discussion_r1939000012
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java:
##########
@@ -241,6 +254,229 @@ CompletableFuture<AsyncSqlCursor<InternalSqlRow>>
executeChildQuery(
});
}
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> executeChildBatch(
+ Query parent,
+ QueryTransactionContext scriptTxContext,
+ int batchOffset,
+ List<ParsedResultWithNextCursorFuture> batch
+ ) {
+ if (IgniteUtils.assertionsEnabled()) {
+ int offsetInBatch = 0;
+ for (ParsedResultWithNextCursorFuture item : batch) {
+ assert item.parsedQuery.queryType() == SqlQueryType.DDL
+ : item.parsedQuery.queryType() + " at statement #" +
(batchOffset + offsetInBatch);
+
+ offsetInBatch++;
+ }
+ }
+
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ List<Query> queries = new ArrayList<>(batch.size());
+
+ try {
+ int offsetInBatch = 0;
+ for (ParsedResultWithNextCursorFuture item : batch) {
+ Query query = new Query(
+ Instant.ofEpochMilli(clockService.now().getPhysical()),
+ parent,
+ item.parsedQuery,
+ batchOffset + offsetInBatch,
+ idGenerator.next(),
+ scriptTxContext,
+ ArrayUtils.OBJECT_EMPTY_ARRAY,
+ item.nextCursorFuture
+ );
+
+ offsetInBatch++;
+
+ trackQuery(query, null);
+
+ queries.add(query);
+
+ parent.cancel.attach(query.cancel);
+ }
+ } catch (QueryCancelledException ex) {
+ queries.forEach(query -> query.onError(ex));
+
+ return failedFuture(ex);
+ } finally {
+ busyLock.leaveBusy();
+ }
+
+ List<CompletableFuture<?>> preparedQueryFutures = new
ArrayList<>(batch.size());
+ for (Query query : queries) {
+
preparedQueryFutures.add(Programs.SCRIPT_ITEM_PREPARATION.run(query));
+ }
+
+ return CompletableFutures.allOf(preparedQueryFutures)
+ .handle((none, ignored) -> {
+ List<DdlPlan> dllPlans = new ArrayList<>();
+ for (Query query : queries) {
+ QueryPlan plan = query.plan;
+ if (plan == null) {
+ assert query.error.get() != null;
+
+ break;
+ }
+
+ dllPlans.add((DdlPlan) plan);
+ }
+
+ return dllPlans;
+ })
+ .thenCompose(dllPlans -> {
Review Comment:
```suggestion
.thenCompose(ddlPlans -> {
```
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutor.java:
##########
@@ -241,6 +254,229 @@ CompletableFuture<AsyncSqlCursor<InternalSqlRow>>
executeChildQuery(
});
}
+ CompletableFuture<AsyncSqlCursor<InternalSqlRow>> executeChildBatch(
+ Query parent,
+ QueryTransactionContext scriptTxContext,
+ int batchOffset,
+ List<ParsedResultWithNextCursorFuture> batch
+ ) {
+ if (IgniteUtils.assertionsEnabled()) {
+ int offsetInBatch = 0;
+ for (ParsedResultWithNextCursorFuture item : batch) {
+ assert item.parsedQuery.queryType() == SqlQueryType.DDL
+ : item.parsedQuery.queryType() + " at statement #" +
(batchOffset + offsetInBatch);
+
+ offsetInBatch++;
+ }
+ }
+
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ List<Query> queries = new ArrayList<>(batch.size());
+
+ try {
+ int offsetInBatch = 0;
+ for (ParsedResultWithNextCursorFuture item : batch) {
+ Query query = new Query(
+ Instant.ofEpochMilli(clockService.now().getPhysical()),
+ parent,
+ item.parsedQuery,
+ batchOffset + offsetInBatch,
+ idGenerator.next(),
+ scriptTxContext,
+ ArrayUtils.OBJECT_EMPTY_ARRAY,
+ item.nextCursorFuture
+ );
+
+ offsetInBatch++;
+
+ trackQuery(query, null);
+
+ queries.add(query);
+
+ parent.cancel.attach(query.cancel);
+ }
+ } catch (QueryCancelledException ex) {
+ queries.forEach(query -> query.onError(ex));
+
+ return failedFuture(ex);
+ } finally {
+ busyLock.leaveBusy();
+ }
+
+ List<CompletableFuture<?>> preparedQueryFutures = new
ArrayList<>(batch.size());
+ for (Query query : queries) {
+
preparedQueryFutures.add(Programs.SCRIPT_ITEM_PREPARATION.run(query));
+ }
+
+ return CompletableFutures.allOf(preparedQueryFutures)
+ .handle((none, ignored) -> {
+ List<DdlPlan> dllPlans = new ArrayList<>();
Review Comment:
```suggestion
List<DdlPlan> ddlPlans = new ArrayList<>();
```
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java:
##########
@@ -63,35 +65,64 @@ public DdlCommandHandler(
this.clockService = clockService;
}
+ /**
+ * Submits given list of commands at once.
+ *
+ * <p>The whole list is submitted atomically. The result of applying of
any individual command in case of conditional statements
+ * may be checked by calling {@link CatalogApplyResult#isApplied(int)}
providing 0-based index of command in question. If exception
+ * is thrown during processing of any command from batch, then none of the
commands will be applied.
+ *
+ * @param batch A batch of command to execute.
+ * @return Future containing result of applying a list of commands to
catalog.
+ */
+ public CompletableFuture<CatalogApplyResult> handle(List<CatalogCommand>
batch) {
+ CompletableFuture<CatalogApplyResult> fut =
catalogManager.execute(batch);
+
+ List<AbstractCreateIndexCommand> createIndexCommands = batch.stream()
+ .filter(AbstractCreateIndexCommand.class::isInstance)
+ .map(AbstractCreateIndexCommand.class::cast)
+ .collect(Collectors.toList());
+
+ if (createIndexCommands.isEmpty()) {
+ return fut;
+ }
+
+ return fut.thenCompose(applyResult ->
+ inBusyLock(busyLock, () -> {
+ List<CompletableFuture<CatalogApplyResult>> toWait =
createIndexCommands.stream()
+ .map(cmd ->
waitTillIndexBecomesAvailableOrRemoved(cmd, applyResult))
+ .collect(Collectors.toList());
+
+ return CompletableFutures.allOf(toWait).thenApply(none ->
applyResult);
+ })
+ );
+ }
+
/**
* Handles ddl commands.
*
* @param cmd Catalog command.
- * @return Future representing pending completion of the operation. If the
command execution resulted in a modification of the catalog,
- * the result will be the activation timestamp of the new catalog
version, if the command did not result in a change of the
- * catalog, the result will be {@code null}.
+ * @return Future containing result of applying a commands to catalog.
*/
- public CompletableFuture<@Nullable Long> handle(CatalogCommand cmd) {
+ public CompletableFuture<CatalogApplyResult> handle(CatalogCommand cmd) {
CompletableFuture<CatalogApplyResult> fut =
catalogManager.execute(cmd);
if (cmd instanceof AbstractCreateIndexCommand) {
fut = fut.thenCompose(applyResult ->
inBusyLock(busyLock, () ->
waitTillIndexBecomesAvailableOrRemoved((AbstractCreateIndexCommand) cmd,
applyResult)));
}
- return fut.thenApply(applyResult -> applyResult.isApplied(0) ?
applyResult.getCatalogTime() : null);
+ return fut;
}
- private CompletionStage<CatalogApplyResult>
waitTillIndexBecomesAvailableOrRemoved(
+ private CompletableFuture<CatalogApplyResult>
waitTillIndexBecomesAvailableOrRemoved(
AbstractCreateIndexCommand cmd,
CatalogApplyResult catalogApplyResult
) {
if (!catalogApplyResult.isApplied(0)) {
Review Comment:
It looks like we now need to check the result of the desired index command:
`sApplied(indexCommandIndex)`, not `isApplied(0)`, or am I missing something?
##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java:
##########
@@ -407,6 +408,30 @@ public CompletableFuture<AsyncDataCursor<InternalSqlRow>>
executePlan(
}
}
+ @Override
+ public CompletableFuture<List<AsyncDataCursor<InternalSqlRow>>>
executeDdlBatch(
+ List<DdlPlan> batch,
+ Consumer<HybridTimestamp> activationTimeListener
+ ) {
+ List<CatalogCommand> commands = batch.stream()
+ .map(DdlPlan::command)
+ .collect(Collectors.toList());
+
+ return ddlCmdHnd.handle(commands).thenApply(result -> {
+
activationTimeListener.accept(HybridTimestamp.hybridTimestamp(result.getCatalogTime()));
+
+ List<AsyncDataCursor<InternalSqlRow>> cursors = new
ArrayList<>(commands.size());
+ for (int i = 0; i < commands.size(); i++) {
+
Review Comment:
is this extra line intentional?
I'd rather add an extra line before `for...` :sunglasses:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]