This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-26156 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 618af38648065c55dc1b6b0520d0a07a9bf703eb Author: amashenkov <[email protected]> AuthorDate: Tue Sep 2 14:48:48 2025 +0300 wip --- .../sql/planner/ItSqlPlannerCommandTest.java | 18 ++- .../cli/call/sql/InvalidatePlannerCacheCall.java | 16 +-- modules/client-handler/build.gradle | 1 + .../ignite/internal/rest/api/sql/SqlQueryApi.java | 20 ++++ .../internal/rest/sql/SqlQueryController.java | 14 +++ modules/sql-engine-api/build.gradle | 1 + .../internal/sql/engine/api/IgniteSqlInternal.java | 18 +++ .../ignite/internal/sql/api/IgniteSqlImpl.java | 7 +- .../ignite/internal/sql/api/IgniteSqlInternal.java | 10 -- .../ignite/internal/sql/engine/QueryProcessor.java | 11 +- .../internal/sql/engine/SqlQueryProcessor.java | 5 +- .../sql/engine/prepare/PrepareService.java | 11 +- .../sql/engine/prepare/PrepareServiceImpl.java | 126 +++++++++++++++------ 13 files changed, 189 insertions(+), 69 deletions(-) diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/planner/ItSqlPlannerCommandTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/planner/ItSqlPlannerCommandTest.java index 0f51473f34a..5fa41a89a18 100644 --- a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/planner/ItSqlPlannerCommandTest.java +++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/sql/planner/ItSqlPlannerCommandTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.cli.commands.sql.planner; import static org.apache.ignite.internal.cli.commands.Options.Constants.CLUSTER_URL_OPTION; -import static org.junit.jupiter.api.Assertions.assertAll; import org.apache.ignite.internal.cli.commands.sql.CliSqlCommandTestBase; import org.apache.ignite.internal.cli.commands.sql.SqlCommand; @@ -32,10 +31,17 @@ class ItSqlPlannerCommandTest extends CliSqlCommandTestBase { void clearCache() { execute("sql", "plan", "clear-cache", CLUSTER_URL_OPTION, NODE_URL); - assertAll( - () -> assertExitCodeIs(1), - this::assertOutputIsEmpty, - this::assertErrOutputIsEmpty - ); + assertExitCodeIs(0); + assertErrOutputIsEmpty(); + assertOutputContains("Successfully cleared SQL query plan cache."); + } + + @Test + void clearCacheFiltered() { + execute("sql", "plan", "clear-cache", CLUSTER_URL_OPTION, NODE_URL, "--tables", "PUBLIC.\"test\""); + + assertExitCodeIs(0); + assertErrOutputIsEmpty(); + assertOutputContains("Successfully cleared SQL query plan cache."); } } diff --git a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/sql/InvalidatePlannerCacheCall.java b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/sql/InvalidatePlannerCacheCall.java index 19ee55db064..0885127c40d 100644 --- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/sql/InvalidatePlannerCacheCall.java +++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/sql/InvalidatePlannerCacheCall.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.cli.call.sql; import jakarta.inject.Singleton; -import org.apache.ignite.internal.cli.call.configuration.JsonString; import org.apache.ignite.internal.cli.core.call.Call; -import org.apache.ignite.internal.cli.core.call.CallOutput; import org.apache.ignite.internal.cli.core.call.DefaultCallOutput; import org.apache.ignite.internal.cli.core.exception.IgniteCliApiException; import org.apache.ignite.internal.cli.core.rest.ApiClientFactory; @@ -31,7 +29,7 @@ import org.apache.ignite.rest.client.invoker.ApiException; * Shows node configuration from ignite cluster. */ @Singleton -public class InvalidatePlannerCacheCall implements Call<InvalidateCacheCallInput, JsonString> { +public class InvalidatePlannerCacheCall implements Call<InvalidateCacheCallInput, String> { private final ApiClientFactory clientFactory; public InvalidatePlannerCacheCall(ApiClientFactory clientFactory) { @@ -40,21 +38,17 @@ public class InvalidatePlannerCacheCall implements Call<InvalidateCacheCallInput /** {@inheritDoc} */ @Override - public CallOutput<JsonString> execute(InvalidateCacheCallInput input) { + public DefaultCallOutput<String> execute(InvalidateCacheCallInput input) { SqlApi client = createApiClient(input); try { - return DefaultCallOutput.success(invalidateSqlPlannerCache(client, input)); - } catch (ApiException | IllegalArgumentException e) { + client.clearCache(input.getTables()); + return DefaultCallOutput.success("Successfully cleared SQL query plan cache."); + } catch (ApiException e) { return DefaultCallOutput.failure(new IgniteCliApiException(e, input.clusterUrl())); } } - private JsonString invalidateSqlPlannerCache(SqlApi api, InvalidateCacheCallInput input) throws ApiException { - // TODO: IGNITE-25872 serialize table names and implement api call. - return JsonString.fromString(""); - } - private SqlApi createApiClient(InvalidateCacheCallInput input) { return new SqlApi(clientFactory.getClient(input.clusterUrl())); } diff --git a/modules/client-handler/build.gradle b/modules/client-handler/build.gradle index d6b6bfbe84d..aed396cf689 100644 --- a/modules/client-handler/build.gradle +++ b/modules/client-handler/build.gradle @@ -30,6 +30,7 @@ dependencies { implementation project(':ignite-configuration-root') implementation project(':ignite-api') implementation project(':ignite-table') + implementation project(':ignite-sql-engine-api') implementation project(':ignite-sql-engine') implementation project(':ignite-network') implementation project(':ignite-core') diff --git a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/sql/SqlQueryApi.java b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/sql/SqlQueryApi.java index 9425219c522..d62784ad6d8 100644 --- a/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/sql/SqlQueryApi.java +++ b/modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/sql/SqlQueryApi.java @@ -23,6 +23,7 @@ import static org.apache.ignite.internal.rest.constants.MediaType.APPLICATION_JS import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Delete; import io.micronaut.http.annotation.Get; +import io.micronaut.http.annotation.QueryValue; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.media.ArraySchema; import io.swagger.v3.oas.annotations.media.Content; @@ -30,6 +31,8 @@ import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import java.util.Collection; +import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.rest.api.Problem; @@ -104,4 +107,21 @@ public interface SqlQueryApi { CompletableFuture<Void> killQuery( @Schema(name = "queryId", description = "The unique identifier of the sql query.", requiredMode = REQUIRED) UUID queryId ); + + /** + * Invalidates SQL query planner cache. + * + * @return The result of the operation. + */ + @Operation( + summary = "Invalidates SQL planner cache.", + description = "Invalidates SQL planner cache records on node that related to provided table names.") + @ApiResponse(responseCode = "200", description = "Successfully cleared SQL query plan cache.") + @Get("plan/clear-cache") + CompletableFuture<Void> clearCache( + @QueryValue + @Schema(description = "SQL query plans, which are related to given tables, will be evicted from cache. Case-sensitive, " + + "cache will be reset if empty.") + Optional<Set<String>> tableNames + ); } diff --git a/modules/rest/src/main/java/org/apache/ignite/internal/rest/sql/SqlQueryController.java b/modules/rest/src/main/java/org/apache/ignite/internal/rest/sql/SqlQueryController.java index 14a1bb42c1a..50f179b2b18 100644 --- a/modules/rest/src/main/java/org/apache/ignite/internal/rest/sql/SqlQueryController.java +++ b/modules/rest/src/main/java/org/apache/ignite/internal/rest/sql/SqlQueryController.java @@ -25,6 +25,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.logger.IgniteLogger; @@ -34,8 +36,10 @@ import org.apache.ignite.internal.rest.api.sql.SqlQueryApi; import org.apache.ignite.internal.rest.api.sql.SqlQueryInfo; import org.apache.ignite.internal.rest.sql.exception.SqlQueryKillException; import org.apache.ignite.internal.rest.sql.exception.SqlQueryNotFoundException; +import org.apache.ignite.internal.sql.engine.api.IgniteSqlInternal; import org.apache.ignite.internal.sql.engine.api.kill.CancellableOperationType; import org.apache.ignite.internal.sql.engine.api.kill.KillHandlerRegistry; +import org.apache.ignite.internal.wrapper.Wrappers; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.sql.Statement; @@ -87,6 +91,16 @@ public class SqlQueryController implements SqlQueryApi, ResourceHolder { } } + @Override + public CompletableFuture<Void> clearCache(Optional<Set<String>> tableNames) { + try { + return Wrappers.unwrap(igniteSql, IgniteSqlInternal.class).invalidatePlannerCache(tableNames.orElse(Set.of())); + } catch (Exception e) { + LOG.error("Failed to invalidate SQL planner cache.", e); + return failedFuture(e); + } + } + private static Void handleOperationResult(UUID queryId, @Nullable Boolean result) { if (result != null && !result) { throw new SqlQueryNotFoundException(queryId.toString()); diff --git a/modules/sql-engine-api/build.gradle b/modules/sql-engine-api/build.gradle index bc8420b1d7f..7e87523e2a4 100644 --- a/modules/sql-engine-api/build.gradle +++ b/modules/sql-engine-api/build.gradle @@ -21,6 +21,7 @@ apply from: "$rootDir/buildscripts/java-junit5.gradle" apply from: "$rootDir/buildscripts/java-test-fixtures.gradle" dependencies { + implementation project(':ignite-api') implementation project(':ignite-core') implementation libs.jetbrains.annotations diff --git a/modules/sql-engine-api/src/main/java/org/apache/ignite/internal/sql/engine/api/IgniteSqlInternal.java b/modules/sql-engine-api/src/main/java/org/apache/ignite/internal/sql/engine/api/IgniteSqlInternal.java new file mode 100644 index 00000000000..d0f2ba150b2 --- /dev/null +++ b/modules/sql-engine-api/src/main/java/org/apache/ignite/internal/sql/engine/api/IgniteSqlInternal.java @@ -0,0 +1,18 @@ +package org.apache.ignite.internal.sql.engine.api; + +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.sql.IgniteSql; + +/** + * Internal SQL facade. + */ +public interface IgniteSqlInternal extends IgniteSql { + + /** + * Invalidates planner cache. + * + * @return Operation completion future. + */ + CompletableFuture<Void> invalidatePlannerCache(Set<String> strings); +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java index 2e4abcb7aa8..9f126d0cbd5 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -55,6 +56,7 @@ import org.apache.ignite.internal.sql.engine.InternalSqlRow; import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.sql.engine.SqlProperties; import org.apache.ignite.internal.sql.engine.SqlQueryType; +import org.apache.ignite.internal.sql.engine.api.IgniteSqlInternal; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.ArrayUtils; @@ -66,7 +68,6 @@ import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.lang.TraceableException; import org.apache.ignite.lang.util.IgniteNameUtils; import org.apache.ignite.sql.BatchedArguments; -import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.sql.ResultSet; import org.apache.ignite.sql.SqlBatchException; import org.apache.ignite.sql.SqlException; @@ -650,8 +651,8 @@ public class IgniteSqlImpl implements IgniteSqlInternal, IgniteComponent { } @Override - public void invalidatePlannerCache() { - queryProcessor.invalidatePlannerCache(); + public CompletableFuture<Void> invalidatePlannerCache(Set<String> tableNames) { + return queryProcessor.invalidatePlannerCache(tableNames); } private static void validateDmlResult(AsyncCursor.BatchedResult<InternalSqlRow> page) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlInternal.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlInternal.java deleted file mode 100644 index a560c28728a..00000000000 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlInternal.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.apache.ignite.internal.sql.api; - -import org.apache.ignite.sql.IgniteSql; - -/** - * Internal SQL facade. - */ -public interface IgniteSqlInternal extends IgniteSql { - void invalidatePlannerCache(); -} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java index 4d0fc33d5b1..011fd386298 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java @@ -17,11 +17,13 @@ package org.apache.ignite.internal.sql.engine; +import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridTimestampTracker; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata; import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.lang.CancellationToken; import org.apache.ignite.lang.IgniteException; import org.jetbrains.annotations.Nullable; @@ -71,7 +73,12 @@ public interface QueryProcessor extends IgniteComponent { Object... params ); - default void invalidatePlannerCache() { - + /** + * Invalidates planner cache. + * + * @return Operation completion future. + */ + default CompletableFuture<Void> invalidatePlannerCache(Set<String> tableNames) { + return CompletableFutures.nullCompletedFuture(); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index cd2f7eaa992..5617b9a5ca9 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -605,8 +606,8 @@ public class SqlQueryProcessor implements QueryProcessor, SystemViewProvider { } @Override - public void invalidatePlannerCache() { - prepareSvc.invalidateCache(); + public CompletableFuture<Void> invalidatePlannerCache(Set<String> tableNames) { + return prepareSvc.invalidateCache(tableNames); } /** Completes the provided future when the callback is called. */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java index 172da6c2a9f..55dfaf71dfd 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareService.java @@ -17,10 +17,12 @@ package org.apache.ignite.internal.sql.engine.prepare; +import java.util.Set; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.sql.engine.SqlOperationContext; import org.apache.ignite.internal.sql.engine.exec.LifecycleAware; import org.apache.ignite.internal.sql.engine.sql.ParsedResult; +import org.apache.ignite.internal.util.CompletableFutures; /** * Preparation service that accepts an AST of the query and returns a prepared query plan. @@ -37,5 +39,12 @@ public interface PrepareService extends LifecycleAware { */ CompletableFuture<QueryPlan> prepareAsync(ParsedResult parsedResult, SqlOperationContext ctx); - default void invalidateCache() {}; + /** + * Invalidates planner cache. + * + * @return Operation completion future. + */ + default CompletableFuture<Void> invalidateCache(Set<String> tableNames) { + return CompletableFutures.nullCompletedFuture(); + }; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java index 964524581ba..4de2f875d37 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PrepareServiceImpl.java @@ -30,6 +30,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -37,6 +38,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.schema.SchemaPlus; @@ -71,6 +73,8 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueGet; import org.apache.ignite.internal.sql.engine.rel.IgniteKeyValueModify; import org.apache.ignite.internal.sql.engine.rel.IgniteRel; import org.apache.ignite.internal.sql.engine.rel.IgniteSelectCount; +import org.apache.ignite.internal.sql.engine.rel.IgniteTableFunctionScan; +import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify; import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas; import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager; import org.apache.ignite.internal.sql.engine.sql.IgniteSqlExplain; @@ -248,7 +252,7 @@ public class PrepareServiceImpl implements PrepareService { boolean explicitTx = operationContext.txContext() != null && operationContext.txContext().explicitTx() != null; long timestamp = operationContext.operationTime().longValue(); - int catalogVersion = schemaManager.catalogVersion(timestamp); + int catalogVersion = schemaManager.catalogVersion(timestamp); CacheKey key = createCacheKey(parsedResult, catalogVersion, schemaName, operationContext.parameters()); @@ -305,10 +309,28 @@ public class PrepareServiceImpl implements PrepareService { ); } - /** Invalidates planner cache. */ + /** {@inheritDoc} */ @Override - public void invalidateCache() { - cache.clear(); + public CompletableFuture<Void> invalidateCache(Set<String> tableNames) { + return CompletableFuture.supplyAsync(() -> { + if (tableNames.isEmpty()) { + cache.clear(); + } else { + cache.removeIfValue(p -> p.isDone() && planMatches(p.join(), tableNames::contains)); + } + + return null; + }, planningPool); + } + + private boolean planMatches(QueryPlan plan, Predicate<String> containsTable) { + assert plan instanceof ExplainablePlan; + + MatchingShuttle shuttle = new MatchingShuttle(containsTable); + + ((ExplainablePlan) plan).getRel().accept(shuttle); + + return shuttle.matches(); } private CompletableFuture<QueryPlan> prepareAsync0( @@ -461,25 +483,25 @@ public class PrepareServiceImpl implements PrepareService { IgniteKeyValueGet kvGet = (IgniteKeyValueGet) optimizedRel; return new KeyValueGetPlan( - nextPlanId(), - catalogVersion, - kvGet, + nextPlanId(), + catalogVersion, + kvGet, resultSetMetadata, parameterMetadata, - relWithMetadata.paMetadata, + relWithMetadata.paMetadata, relWithMetadata.ppMetadata ); } var plan = new MultiStepPlan( - nextPlanId(), - SqlQueryType.QUERY, - optimizedRel, - resultSetMetadata, - parameterMetadata, - catalogVersion, - relWithMetadata.numSources, - fastPlan, + nextPlanId(), + SqlQueryType.QUERY, + optimizedRel, + resultSetMetadata, + parameterMetadata, + catalogVersion, + relWithMetadata.numSources, + fastPlan, relWithMetadata.paMetadata, relWithMetadata.ppMetadata ); @@ -545,10 +567,10 @@ public class PrepareServiceImpl implements PrepareService { if (optimizedRel instanceof IgniteKeyValueModify) { plan = new KeyValueModifyPlan( nextPlanId(), - ctx.catalogVersion(), + ctx.catalogVersion(), (IgniteKeyValueModify) optimizedRel, DML_METADATA, - parameterMetadata, + parameterMetadata, relWithMetadata.paMetadata, relWithMetadata.ppMetadata ); @@ -558,9 +580,9 @@ public class PrepareServiceImpl implements PrepareService { SqlQueryType.DML, optimizedRel, DML_METADATA, parameterMetadata, - ctx.catalogVersion(), - relWithMetadata.numSources, - null, + ctx.catalogVersion(), + relWithMetadata.numSources, + null, relWithMetadata.paMetadata, relWithMetadata.ppMetadata ); @@ -617,25 +639,25 @@ public class PrepareServiceImpl implements PrepareService { IgniteKeyValueModify kvModify = (IgniteKeyValueModify) optimizedRel; plan = new KeyValueModifyPlan( - nextPlanId(), + nextPlanId(), catalogVersion, kvModify, DML_METADATA, - parameterMetadata, - relWithMetadata.paMetadata, + parameterMetadata, + relWithMetadata.paMetadata, relWithMetadata.ppMetadata ); } else { plan = new MultiStepPlan( - nextPlanId(), + nextPlanId(), SqlQueryType.DML, optimizedRel, - DML_METADATA, - parameterMetadata, + DML_METADATA, + parameterMetadata, catalogVersion, - relWithMetadata.numSources, - null, - relWithMetadata.paMetadata, + relWithMetadata.numSources, + null, + relWithMetadata.paMetadata, relWithMetadata.ppMetadata ); } @@ -773,9 +795,9 @@ public class PrepareServiceImpl implements PrepareService { } private RelWithMetadata doOptimize( - PlanningContext ctx, - SqlNode validatedNode, - IgnitePlanner planner, + PlanningContext ctx, + SqlNode validatedNode, + IgnitePlanner planner, @Nullable Runnable onTimeoutAction ) { // Convert to Relational operators graph @@ -912,7 +934,7 @@ public class PrepareServiceImpl implements PrepareService { RelWithMetadata( IgniteRel rel, int numSources, - @Nullable PartitionAwarenessMetadata paMetadata, + @Nullable PartitionAwarenessMetadata paMetadata, @Nullable PartitionPruningMetadata ppMetadata ) { this.rel = rel; @@ -921,4 +943,40 @@ public class PrepareServiceImpl implements PrepareService { this.ppMetadata = ppMetadata; } } + + private static class MatchingShuttle extends IgniteRelShuttle { + private final Predicate<String> containsTable; + boolean matches; + + private MatchingShuttle(Predicate<String> containsTable) { + this.containsTable = containsTable; + matches = false; + } + + /** + * Visits all children of a parent. + */ + @Override + protected IgniteRel processNode(IgniteRel rel) { + if (!matches && rel.getTable() != null) { + matches = containsTable.test(rel.getTable().getQualifiedName().toString()); + } + + if (matches) { + return rel; + } + + List<IgniteRel> inputs = Commons.cast(rel.getInputs()); + + for (int i = 0; i < inputs.size() && !matches; i++) { + visit(inputs.get(i)); + } + + return rel; + } + + boolean matches() { + return matches; + } + } }
