This is an automated email from the ASF dual-hosted git repository. gvvinblade pushed a commit to branch ignite-12248 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-12248 by this push: new 54a02e1 query cache improvements 54a02e1 is described below commit 54a02e1976e26bd3e3ab54c084a9fc8ae61e45e6 Author: Igor Seliverstov <gvvinbl...@gmail.com> AuthorDate: Tue Sep 29 15:49:44 2020 +0300 query cache improvements --- .../query/calcite/exec/ExecutionServiceImpl.java | 160 ++++++++++----------- .../calcite/prepare/AbstractMultiStepPlan.java | 62 +------- .../processors/query/calcite/prepare/Cloner.java | 65 +++------ .../query/calcite/prepare/ExplainPlan.java | 5 +- .../processors/query/calcite/prepare/Fragment.java | 2 +- .../prepare/{QueryPlan.java => FragmentPlan.java} | 40 +++--- .../query/calcite/prepare/FragmentSplitter.java | 111 +------------- .../{Splitter.java => IgniteRelShuttle.java} | 103 +++---------- .../query/calcite/prepare/MultiStepDmlPlan.java | 13 +- .../query/calcite/prepare/MultiStepQueryPlan.java | 13 +- ...stractMultiStepPlan.java => QueryMappings.java} | 101 +++++-------- .../query/calcite/prepare/QueryPlan.java | 6 +- .../processors/query/calcite/prepare/Splitter.java | 120 +--------------- 13 files changed, 210 insertions(+), 591 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index 6e3c0a7..59d6d49 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -28,6 +28,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.Context; +import org.apache.calcite.plan.Contexts; import org.apache.calcite.plan.ConventionTraitDef; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelTraitDef; @@ -83,6 +85,7 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.CalciteQueryF import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan; import org.apache.ignite.internal.processors.query.calcite.prepare.Fragment; import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentDescription; +import org.apache.ignite.internal.processors.query.calcite.prepare.FragmentPlan; import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner; import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepDmlPlan; import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan; @@ -366,9 +369,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut String qry, Object[] params ) { - PlanningContext pctx = createContext(ctx, schema, qry, params); + PlanningContext pctx = createContext(Commons.convert(ctx), topologyVersion(), localNodeId(), schema, qry, params); - List<QueryPlan> qryPlans = prepareQueryPlan(pctx); + List<QueryPlan> qryPlans = queryPlanCache().queryPlan(pctx, new CacheKey(pctx.schemaName(), pctx.query()), this::prepareQuery); return executePlans(qryPlans, pctx); } @@ -453,12 +456,8 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut } /** */ - private PlanningContext createContext( - @Nullable QueryContext qryCtx, - @Nullable String schemaName, - String qry, - Object[] params - ) { + private PlanningContext createContext(Context parent, AffinityTopologyVersion topVer, UUID originator, + @Nullable String schema, String qry, Object[] params) { RelTraitDef<?>[] traitDefs = { ConventionTraitDef.INSTANCE, RelCollationTraitDef.INSTANCE, @@ -468,56 +467,23 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut return PlanningContext.builder() .localNodeId(localNodeId()) - .parentContext(Commons.convert(qryCtx)) + .originatingNodeId(originator) + .parentContext(parent) .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) - .defaultSchema(schemaName != null - ? schemaHolder().schema().getSubSchema(schemaName) + .defaultSchema(schema != null + ? schemaHolder().schema().getSubSchema(schema) : schemaHolder().schema()) .traitDefs(traitDefs) .build()) .query(qry) .parameters(params) - .topologyVersion(topologyVersion()) - .logger(log) - .build(); - } - - /** */ - private PlanningContext createContext( - @Nullable String schemaName, - UUID originatingNodeId, - AffinityTopologyVersion topVer - ) { - // TODO pass to context user locale and timezone. - - RelTraitDef<?>[] traitDefs = { - ConventionTraitDef.INSTANCE, - RelCollationTraitDef.INSTANCE, - DistributionTraitDef.INSTANCE, - RewindabilityTraitDef.INSTANCE - }; - - return PlanningContext.builder() - .localNodeId(localNodeId()) - .originatingNodeId(originatingNodeId) - .frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG) - .defaultSchema(schemaName != null - ? schemaHolder().schema().getSubSchema(schemaName) - : schemaHolder().schema()) - .traitDefs(traitDefs) - .build()) .topologyVersion(topVer) .logger(log) .build(); } /** */ - private List<QueryPlan> prepareQueryPlan(PlanningContext ctx) { - return queryPlanCache().queryPlan(ctx, new CacheKey(ctx.schemaName(), ctx.query()), this::prepare0); - } - - /** */ - private List<QueryPlan> prepare0(PlanningContext ctx) { + private List<QueryPlan> prepareQuery(PlanningContext ctx) { try { String qry = ctx.query(); @@ -552,6 +518,11 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut } /** */ + private List<QueryPlan> prepareFragment(PlanningContext ctx) { + return ImmutableList.of(new FragmentPlan(fromJson(ctx, ctx.query()))); + } + + /** */ private QueryPlan prepareSingle(SqlNode sqlNode, PlanningContext ctx) throws ValidationException { assert single(sqlNode); @@ -741,7 +712,7 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut QueryStartRequest req = new QueryStartRequest( qryId, pctx.schemaName(), - fragment0.rootSerialized(), + fragment0.serialized(), pctx.topologyVersion(), fragmentDesc0, pctx.parameters()); @@ -771,6 +742,56 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut } /** */ + private void executeFragment(UUID qryId, FragmentPlan plan, PlanningContext pctx, FragmentDescription fragmentDesc) { + ExecutionContext<Row> ectx = new ExecutionContext<>(taskExecutor(), pctx, qryId, + fragmentDesc, handler, Commons.parametersMap(pctx.parameters())); + + long frId = fragmentDesc.fragmentId(); + UUID origNodeId = pctx.originatingNodeId(); + + Outbox<Row> node; + try { + node = new LogicalRelImplementor<>( + ectx, + partitionService(), + mailboxRegistry(), + exchangeService(), + failureProcessor()) + .go(plan.root()); + } + catch (Exception ex) { + U.error(log, "Failed to build execution tree. ", ex); + + mailboxRegistry.outboxes(qryId, frId, -1) + .forEach(Outbox::close); + mailboxRegistry.inboxes(qryId, frId, -1) + .forEach(Inbox::close); + + try { + messageService().send(origNodeId, new QueryStartResponse(qryId, frId, ex)); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send reply. [nodeId=" + origNodeId + ']', e); + } + + return; + } + + try { + messageService().send(origNodeId, new QueryStartResponse(qryId, frId)); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send reply. [nodeId=" + origNodeId + ']', e); + + node.onNodeLeft(origNodeId); + + return; + } + + node.init(); + } + + /** */ private void register(QueryInfo info) { UUID qryId = info.ctx.queryId(); PlanningContext pctx = info.ctx.planningContext(); @@ -832,50 +853,15 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut private void onMessage(UUID nodeId, QueryStartRequest msg) { assert nodeId != null && msg != null; - PlanningContext ctx = createContext(msg.schema(), nodeId, msg.topologyVersion()); - ExecutionContext<Row> execCtx = new ExecutionContext<>(taskExecutor(), ctx, msg.queryId(), - msg.fragmentDescription(), handler, Commons.parametersMap(msg.parameters())); - - Outbox<Row> node; - try { - node = new LogicalRelImplementor<>( - execCtx, - partitionService(), - mailboxRegistry(), - exchangeService(), - failureProcessor()) - .go(fromJson(ctx, msg.root())); - } - catch (Exception ex) { - U.error(log, "Failed to build execution tree. ", ex); - - mailboxRegistry.outboxes(msg.queryId(), msg.fragmentId(), -1) - .forEach(Outbox::close); - mailboxRegistry.inboxes(msg.queryId(), msg.fragmentId(), -1) - .forEach(Inbox::close); + PlanningContext pctx = createContext(Contexts.empty(), msg.topologyVersion(), nodeId, msg.schema(), msg.root(), msg.parameters()); - try { - messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentId(), ex)); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e); - } + List<QueryPlan> qryPlans = queryPlanCache().queryPlan(pctx, new CacheKey(pctx.schemaName(), pctx.query()), this::prepareFragment); - return; - } + assert qryPlans.size() == 1 && qryPlans.get(0).type() == QueryPlan.Type.FRAGMENT; - try { - messageService().send(nodeId, new QueryStartResponse(msg.queryId(), msg.fragmentDescription().fragmentId())); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send reply. [nodeId=" + nodeId + ']', e); + FragmentPlan plan = (FragmentPlan)qryPlans.get(0); - node.onNodeLeft(nodeId); - - return; - } - - node.init(); + executeFragment(msg.queryId(), plan, pctx, msg.fragmentDescription()); } /** */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java index 47aa7c8..f146e39 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java @@ -21,13 +21,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; - -import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; -import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; -import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; import org.apache.ignite.internal.util.typedef.F; @@ -44,12 +40,16 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan { protected final List<GridQueryFieldMetadata> fieldsMeta; /** */ + protected final QueryMappings queryMappings; + + /** */ protected Map<Long, NodesMapping> mappings; /** */ - protected AbstractMultiStepPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) { + protected AbstractMultiStepPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta, QueryMappings queryMappings) { this.fragments = fragments; this.fieldsMeta = fieldsMeta; + this.queryMappings = queryMappings; } /** {@inheritDoc} */ @@ -92,61 +92,11 @@ public abstract class AbstractMultiStepPlan implements MultiStepPlan { /** {@inheritDoc} */ @Override public void init(MappingService mappingService, PlanningContext ctx) { - mappings = U.newHashMap(fragments.size()); - - RelMetadataQuery mq = F.first(fragments).root().getCluster().getMetadataQuery(); - - for (int i = 0, j = 0; i < fragments.size();) { - Fragment fragment = fragments.get(i); - - try { - mappings.put(fragment.fragmentId(), fragment.map(mappingService, ctx, mq)); - - i++; - } - catch (OptimisticPlanningException e) { - if (++j > 3) - throw new IgniteSQLException("Failed to map query.", e); - - replace(fragment, new FragmentSplitter(e.node()).go(fragment)); - - // restart init routine. - mappings.clear(); - i = 0; - } - } + mappings = queryMappings.map(mappingService, ctx, fragments); } /** */ private NodesMapping fragmentMapping(long fragmentId) { return mappings == null ? null : mappings.get(fragmentId); } - - /** */ - private void replace(Fragment fragment, List<Fragment> replacement) { - assert !F.isEmpty(replacement); - - Map<Long, Long> newTargets = new HashMap<>(); - - for (Fragment fragment0 : replacement) { - for (IgniteReceiver remote : fragment0.remotes()) - newTargets.put(remote.exchangeId(), fragment0.fragmentId()); - } - - for (int i = 0; i < fragments.size(); i++) { - Fragment fragment0 = fragments.get(i); - - if (fragment0 == fragment) - fragments.set(i, F.first(replacement)); - else if (!fragment0.local()) { - IgniteSender sender = (IgniteSender)fragment0.root(); - Long newTargetId = newTargets.get(sender.exchangeId()); - - if (newTargetId != null) - sender.targetFragmentId(newTargetId); - } - } - - fragments.addAll(replacement.subList(1, replacement.size())); - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java index 9e22c47..aeee7f5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Cloner.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; -import java.util.ArrayList; import java.util.List; import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptCluster; @@ -42,7 +41,6 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchang import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues; import org.apache.ignite.internal.processors.query.calcite.util.Commons; -import org.apache.ignite.internal.util.typedef.F; /** */ class Cloner implements IgniteRelVisitor<IgniteRel> { @@ -50,7 +48,7 @@ class Cloner implements IgniteRelVisitor<IgniteRel> { private final RelOptCluster cluster; /** */ - private FragmentProto curr; + private ImmutableList.Builder<IgniteReceiver> remotes; Cloner(RelOptCluster cluster) { this.cluster = cluster; @@ -59,22 +57,31 @@ class Cloner implements IgniteRelVisitor<IgniteRel> { /** * Clones and associates a plan with a new cluster. * - * @param src Plan to clone. + * @param src Fragments to clone. * @return New plan. */ - List<Fragment> go(List<Fragment> src) { - assert !F.isEmpty(src); + public List<Fragment> go(List<Fragment> src) { + return Commons.transform(src, this::go); + } + + /** + * Clones and associates a plan with a new cluster. + * + * @param src Fragment to clone. + * @return New plan. + */ + public Fragment go(Fragment src) { + try { + remotes = ImmutableList.builder(); - List<Fragment> fragments = new ArrayList<>(src.size()); + IgniteRel newRoot = visit(src.root()); + ImmutableList<IgniteReceiver> remotes = this.remotes.build(); - for (Fragment fragment : src) { - curr = new FragmentProto(fragment.fragmentId(), fragment.root(), fragment.rootSerialized()); - curr.root = visit(curr.root); - fragments.add(curr.build()); - curr = null; + return new Fragment(src.fragmentId(), newRoot, remotes, src.serialized()); + } + finally { + remotes = null; } - - return fragments; } /** {@inheritDoc} */ @@ -163,7 +170,8 @@ class Cloner implements IgniteRelVisitor<IgniteRel> { IgniteReceiver receiver = new IgniteReceiver(cluster, rel.getTraitSet(), rel.getRowType(), rel.exchangeId(), rel.sourceFragmentId()); - curr.remotes.add(receiver); + if (remotes != null) + remotes.add(receiver); return receiver; } @@ -203,31 +211,4 @@ class Cloner implements IgniteRelVisitor<IgniteRel> { @Override public IgniteRel visit(IgniteRel rel) { return rel.accept(this); } - - /** */ - private static class FragmentProto { - /** */ - private final long id; - - /** */ - private IgniteRel root; - - /** Serialized representation. */ - private String rootSer; - - /** */ - private final ImmutableList.Builder<IgniteReceiver> remotes = ImmutableList.builder(); - - /** */ - private FragmentProto(long id, IgniteRel root, String rootSer) { - this.id = id; - this.root = root; - this.rootSer = rootSer; - } - - /** */ - Fragment build() { - return new Fragment(id, root, remotes.build(), rootSer); - } - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java index 6d0e3cb..b96a9ca 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ExplainPlan.java @@ -17,9 +17,8 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; import java.util.List; - import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; -import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.NotNull; /** * Query explain plan. @@ -46,7 +45,7 @@ public class ExplainPlan implements QueryPlan { } /** {@inheritDoc} */ - @Override public QueryPlan clone(@Nullable PlanningContext ctx) { + @Override public QueryPlan clone(@NotNull PlanningContext ctx) { return this; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java index 98db988..04b57e2 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java @@ -128,7 +128,7 @@ public class Fragment { * * @return Serialized form. */ - public String rootSerialized() { + public String serialized() { return rootSer != null ? rootSer : (rootSer = toJson(root())); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java similarity index 57% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java index 7f38050..dd28f96 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentPlan.java @@ -17,23 +17,31 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.jetbrains.annotations.NotNull; -/** - * - */ -public interface QueryPlan { - /** Query type */ - enum Type { QUERY, DML, DDL, EXPLAIN } +/** */ +public class FragmentPlan implements QueryPlan { + /** */ + private final IgniteRel root; + + /** */ + public FragmentPlan(IgniteRel root) { + this.root = root; + } + + /** */ + public IgniteRel root() { + return root; + } - /** - * @return Query type. - */ - Type type(); + /** {@inheritDoc} */ + @Override public Type type() { + return Type.FRAGMENT; + } - /** - * Clones this plan. - * @param ctx Planner context. - */ - QueryPlan clone(@Nullable PlanningContext ctx); + /** {@inheritDoc} */ + @Override public QueryPlan clone(@NotNull PlanningContext ctx) { + return new FragmentPlan(new Cloner(ctx.cluster()).visit(root)); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java index 1dbf3cb..0843557 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/FragmentSplitter.java @@ -26,31 +26,17 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues; import org.apache.ignite.internal.processors.query.calcite.util.Commons; /** * */ -public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> { +public class FragmentSplitter extends IgniteRelShuttle { /** */ private final Deque<FragmentProto> stack = new LinkedList<>(); @@ -81,90 +67,7 @@ public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> { } /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteFilter rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteTrimExchange rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteProject rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteNestedLoopJoin rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteTableModify rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteAggregate rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteMapAggregate rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteReduceAggregate rel) { - assert cutPoint != rel; - - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteUnionAll rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteSort rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteSender rel) { - assert cutPoint != rel; - - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteIndexScan rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteTableScan rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteValues rel) { - assert cutPoint != rel; - - return rel; - } - - /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteReceiver rel) { - assert cutPoint != rel; - curr.remotes.add(rel); return rel; @@ -183,7 +86,7 @@ public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> { /** * Visits all children of a parent. */ - private IgniteRel processNode(IgniteRel rel) { + @Override protected IgniteRel processNode(IgniteRel rel) { if (rel == cutPoint) { cutPoint = null; @@ -198,16 +101,6 @@ public class FragmentSplitter implements IgniteRelVisitor<IgniteRel> { return rel; } - /** - * Visits a particular child of a parent and replaces the child if it was changed. - */ - private void visitChild(IgniteRel parent, int i, IgniteRel child) { - IgniteRel newChild = visit(child); - - if (newChild != child) - parent.replaceInput(i, newChild); - } - /** */ private IgniteRel split(IgniteRel rel) { RelOptCluster cluster = rel.getCluster(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java similarity index 70% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java index 4616756..5f235c2 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgniteRelShuttle.java @@ -17,12 +17,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; -import java.util.ArrayList; -import java.util.Deque; -import java.util.LinkedList; import java.util.List; -import com.google.common.collect.ImmutableList; -import org.apache.calcite.plan.RelOptCluster; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; @@ -44,30 +39,11 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues; import org.apache.ignite.internal.processors.query.calcite.util.Commons; -/** - * Splits a query into a list of query fragments. - */ -public class Splitter implements IgniteRelVisitor<IgniteRel> { - /** */ - private final Deque<FragmentProto> stack = new LinkedList<>(); - - /** */ - private FragmentProto curr; - - /** */ - public List<Fragment> go(IgniteRel root) { - ArrayList<Fragment> res = new ArrayList<>(); - - stack.push(new FragmentProto(Fragment.ID_GEN.getAndIncrement(), root)); - - while (!stack.isEmpty()) { - curr = stack.pop(); - curr.root = visit(curr.root); - res.add(curr.build()); - curr = null; - } - - return res; +/** */ +public class IgniteRelShuttle implements IgniteRelVisitor<IgniteRel> { + /** {@inheritDoc} */ + @Override public IgniteRel visit(IgniteSender rel) { + return processNode(rel); } /** {@inheritDoc} */ @@ -86,11 +62,6 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> { } /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteSort rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteNestedLoopJoin rel) { return processNode(rel); } @@ -101,7 +72,7 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> { } /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteTableModify rel) { + @Override public IgniteRel visit(IgniteExchange rel) { return processNode(rel); } @@ -121,33 +92,38 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> { } /** {@inheritDoc} */ + @Override public IgniteRel visit(IgniteTableModify rel) { + return processNode(rel); + } + + /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteUnionAll rel) { return processNode(rel); } /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteSender rel) { + @Override public IgniteRel visit(IgniteSort rel) { return processNode(rel); } /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteIndexScan rel) { - return rel; + return processNode(rel); } /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteTableScan rel) { - return rel; + return processNode(rel); } /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteValues rel) { - return rel; + @Override public IgniteRel visit(IgniteReceiver rel) { + return processNode(rel); } /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteReceiver rel) { - throw new AssertionError(); + @Override public IgniteRel visit(IgniteValues rel) { + return processNode(rel); } /** {@inheritDoc} */ @@ -155,27 +131,10 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> { return rel.accept(this); } - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteExchange rel) { - RelOptCluster cluster = rel.getCluster(); - - long targetFragmentId = curr.id; - long sourceFragmentId = Fragment.ID_GEN.getAndIncrement(); - long exchangeId = sourceFragmentId; - - IgniteReceiver receiver = new IgniteReceiver(cluster, rel.getTraitSet(), rel.getRowType(), exchangeId, sourceFragmentId); - IgniteSender sender = new IgniteSender(cluster, rel.getTraitSet(), rel.getInput(), exchangeId, targetFragmentId, rel.distribution()); - - curr.remotes.add(receiver); - stack.push(new FragmentProto(sourceFragmentId, sender)); - - return receiver; - } - /** * Visits all children of a parent. */ - private IgniteRel processNode(IgniteRel rel) { + protected IgniteRel processNode(IgniteRel rel) { List<IgniteRel> inputs = Commons.cast(rel.getInputs()); for (int i = 0; i < inputs.size(); i++) @@ -187,32 +146,10 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> { /** * Visits a particular child of a parent and replaces the child if it was changed. */ - private void visitChild(IgniteRel parent, int i, IgniteRel child) { + protected void visitChild(IgniteRel parent, int i, IgniteRel child) { IgniteRel newChild = visit(child); if (newChild != child) parent.replaceInput(i, newChild); } - - /** */ - private static class FragmentProto { - /** */ - private final long id; - - /** */ - private IgniteRel root; - - /** */ - private final ImmutableList.Builder<IgniteReceiver> remotes = ImmutableList.builder(); - - /** */ - private FragmentProto(long id, IgniteRel root) { - this.id = id; - this.root = root; - } - - Fragment build() { - return new Fragment(id, root, remotes.build()); - } - } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java index 75cf46a..c36e608 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java @@ -18,9 +18,9 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; import java.util.List; - import com.google.common.collect.ImmutableList; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.jetbrains.annotations.NotNull; /** * Distributed dml plan. @@ -38,7 +38,12 @@ public class MultiStepDmlPlan extends AbstractMultiStepPlan { * @param fieldsMeta Fields metadata. */ public MultiStepDmlPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) { - super(fragments, fieldsMeta); + this(fragments, fieldsMeta, new QueryMappings()); + } + + /** */ + private MultiStepDmlPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta, QueryMappings mappings) { + super(fragments, fieldsMeta, mappings); } /** {@inheritDoc} */ @@ -48,7 +53,7 @@ public class MultiStepDmlPlan extends AbstractMultiStepPlan { /** {@inheritDoc} * @param ctx*/ - @Override public QueryPlan clone(PlanningContext ctx) { - return new MultiStepDmlPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMetadata()); + @Override public QueryPlan clone(@NotNull PlanningContext ctx) { + return new MultiStepDmlPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMeta, queryMappings); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java index 7b64c9b..5d88964 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java @@ -18,9 +18,9 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; import java.util.List; - import com.google.common.collect.ImmutableList; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.jetbrains.annotations.NotNull; /** * Distributed query plan. @@ -39,7 +39,12 @@ public class MultiStepQueryPlan extends AbstractMultiStepPlan { * @param fieldsMeta Fields metadata. */ public MultiStepQueryPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) { - super(fragments, fieldsMeta); + this(fragments, fieldsMeta, new QueryMappings()); + } + + /** */ + private MultiStepQueryPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta, QueryMappings mappings) { + super(fragments, fieldsMeta, mappings); } /** {@inheritDoc} */ @@ -49,7 +54,7 @@ public class MultiStepQueryPlan extends AbstractMultiStepPlan { /** {@inheritDoc} * @param ctx*/ - @Override public QueryPlan clone(PlanningContext ctx) { - return new MultiStepQueryPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMetadata()); + @Override public QueryPlan clone(@NotNull PlanningContext ctx) { + return new MultiStepQueryPlan(new Cloner(ctx.cluster()).go(fragments), fieldsMeta, queryMappings); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryMappings.java similarity index 56% copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryMappings.java index 47aa7c8..7c145c5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryMappings.java @@ -20,10 +20,11 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; - +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import com.google.common.collect.ImmutableMap; import org.apache.calcite.rel.metadata.RelMetadataQuery; -import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService; import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping; @@ -31,99 +32,71 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPl import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * - */ -public abstract class AbstractMultiStepPlan implements MultiStepPlan { - /** */ - protected final List<Fragment> fragments; - - /** */ - protected final List<GridQueryFieldMetadata> fieldsMeta; +/** */ +public class QueryMappings { /** */ - protected Map<Long, NodesMapping> mappings; + private static class Mapping { + /** */ + private final AffinityTopologyVersion ver; - /** */ - protected AbstractMultiStepPlan(List<Fragment> fragments, List<GridQueryFieldMetadata> fieldsMeta) { - this.fragments = fragments; - this.fieldsMeta = fieldsMeta; - } - - /** {@inheritDoc} */ - @Override public List<Fragment> fragments() { - return fragments; - } + /** */ + private final Map<Long, NodesMapping> nodeMappings; - /** {@inheritDoc} */ - @Override public List<GridQueryFieldMetadata> fieldsMetadata() { - return fieldsMeta; - } - - /** {@inheritDoc} */ - @Override public NodesMapping fragmentMapping(Fragment fragment) { - return fragmentMapping(fragment.fragmentId()); - } - - /** {@inheritDoc} */ - @Override public NodesMapping targetMapping(Fragment fragment) { - if (fragment.local()) - return null; - - return fragmentMapping(((IgniteSender)fragment.root()).targetFragmentId()); + /** */ + private Mapping(AffinityTopologyVersion ver, Map<Long, NodesMapping> nodeMappings) { + this.ver = ver; + this.nodeMappings = nodeMappings; + } } - /** {@inheritDoc} */ - @Override public Map<Long, List<UUID>> remoteSources(Fragment fragment) { - List<IgniteReceiver> remotes = fragment.remotes(); - - if (F.isEmpty(remotes)) - return null; + /** */ + private final AtomicReference<Mapping> mapping = new AtomicReference<>(); - HashMap<Long, List<UUID>> res = U.newHashMap(remotes.size()); + /** */ + public Map<Long, NodesMapping> map(MappingService mappingService, PlanningContext ctx, List<Fragment> fragments) { + Mapping mapping = this.mapping.get(); - for (IgniteReceiver remote : remotes) - res.put(remote.exchangeId(), fragmentMapping(remote.sourceFragmentId()).nodes()); + if (mapping != null && Objects.equals(mapping.ver, ctx.topologyVersion())) + return mapping.nodeMappings; - return res; - } - - /** {@inheritDoc} */ - @Override public void init(MappingService mappingService, PlanningContext ctx) { - mappings = U.newHashMap(fragments.size()); + ImmutableMap.Builder<Long, NodesMapping> b = ImmutableMap.builder(); RelMetadataQuery mq = F.first(fragments).root().getCluster().getMetadataQuery(); + boolean save = true; for (int i = 0, j = 0; i < fragments.size();) { Fragment fragment = fragments.get(i); try { - mappings.put(fragment.fragmentId(), fragment.map(mappingService, ctx, mq)); + b.put(fragment.fragmentId(), fragment.map(mappingService, ctx, mq)); i++; } catch (OptimisticPlanningException e) { + save = false; // we mustn't save mappings for mutated fragments + if (++j > 3) throw new IgniteSQLException("Failed to map query.", e); - replace(fragment, new FragmentSplitter(e.node()).go(fragment)); + replace(fragments, fragment, new FragmentSplitter(e.node()).go(fragment)); // restart init routine. - mappings.clear(); + b = ImmutableMap.builder(); i = 0; } } - } - /** */ - private NodesMapping fragmentMapping(long fragmentId) { - return mappings == null ? null : mappings.get(fragmentId); + ImmutableMap<Long, NodesMapping> mappings = b.build(); + + if (save) + this.mapping.compareAndSet(mapping, new Mapping(ctx.topologyVersion(), mappings)); + + return mappings; } /** */ - private void replace(Fragment fragment, List<Fragment> replacement) { + private void replace(List<Fragment> fragments, Fragment fragment, List<Fragment> replacement) { assert !F.isEmpty(replacement); Map<Long, Long> newTargets = new HashMap<>(); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java index 7f38050..1c204f0 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlan.java @@ -17,14 +17,14 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; -import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.NotNull; /** * */ public interface QueryPlan { /** Query type */ - enum Type { QUERY, DML, DDL, EXPLAIN } + enum Type { QUERY, FRAGMENT, DML, DDL, EXPLAIN } /** * @return Query type. @@ -35,5 +35,5 @@ public interface QueryPlan { * Clones this plan. * @param ctx Planner context. */ - QueryPlan clone(@Nullable PlanningContext ctx); + QueryPlan clone(@NotNull PlanningContext ctx); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java index 4616756..464efa1 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Splitter.java @@ -23,31 +23,15 @@ import java.util.LinkedList; import java.util.List; import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptCluster; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteAggregate; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteCorrelatedNestedLoopJoin; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteMapAggregate; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteNestedLoopJoin; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReduceAggregate; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor; import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSort; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableModify; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTrimExchange; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteUnionAll; -import org.apache.ignite.internal.processors.query.calcite.rel.IgniteValues; -import org.apache.ignite.internal.processors.query.calcite.util.Commons; /** * Splits a query into a list of query fragments. */ -public class Splitter implements IgniteRelVisitor<IgniteRel> { +public class Splitter extends IgniteRelShuttle { /** */ private final Deque<FragmentProto> stack = new LinkedList<>(); @@ -71,91 +55,11 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> { } /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteFilter rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteTrimExchange rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteProject rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteSort rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteNestedLoopJoin rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteCorrelatedNestedLoopJoin rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteTableModify rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteAggregate rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteMapAggregate rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteReduceAggregate rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteUnionAll rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteSender rel) { - return processNode(rel); - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteIndexScan rel) { - return rel; - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteTableScan rel) { - return rel; - } - - /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteValues rel) { - return rel; - } - - /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteReceiver rel) { throw new AssertionError(); } /** {@inheritDoc} */ - @Override public IgniteRel visit(IgniteRel rel) { - return rel.accept(this); - } - - /** {@inheritDoc} */ @Override public IgniteRel visit(IgniteExchange rel) { RelOptCluster cluster = rel.getCluster(); @@ -172,28 +76,6 @@ public class Splitter implements IgniteRelVisitor<IgniteRel> { return receiver; } - /** - * Visits all children of a parent. - */ - private IgniteRel processNode(IgniteRel rel) { - List<IgniteRel> inputs = Commons.cast(rel.getInputs()); - - for (int i = 0; i < inputs.size(); i++) - visitChild(rel, i, inputs.get(i)); - - return rel; - } - - /** - * Visits a particular child of a parent and replaces the child if it was changed. - */ - private void visitChild(IgniteRel parent, int i, IgniteRel child) { - IgniteRel newChild = visit(child); - - if (newChild != child) - parent.replaceInput(i, newChild); - } - /** */ private static class FragmentProto { /** */