http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java index 07ff0ab..d879d36 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java @@ -20,6 +20,8 @@ package org.apache.hyracks.algebricks.core.rewriter.base; import java.util.Properties; +import org.apache.hyracks.algebricks.core.config.AlgebricksConfig; + public class PhysicalOptimizationConfig { private static final int MB = 1048576; @@ -31,10 +33,11 @@ public class PhysicalOptimizationConfig { private static final String MAX_FRAMES_FOR_TEXTSEARCH = "MAX_FRAMES_FOR_TEXTSEARCH"; private static final String FUDGE_FACTOR = "FUDGE_FACTOR"; private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME"; - private static final String DEFAULT_HASH_GROUP_TABLE_SIZE = "DEFAULT_HASH_GROUP_TABLE_SIZE"; private static final String DEFAULT_EXTERNAL_GROUP_TABLE_SIZE = "DEFAULT_EXTERNAL_GROUP_TABLE_SIZE"; private static final String DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE = "DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE"; + private static final String SORT_PARALLEL = "SORT_PARALLEL"; + private static final String SORT_SAMPLES = "SORT_SAMPLES"; private Properties properties = new Properties(); @@ -143,6 +146,22 @@ public class PhysicalOptimizationConfig { setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, tableSize); } + public boolean getSortParallel() { + return getBoolean(SORT_PARALLEL, AlgebricksConfig.SORT_PARALLEL); + } + + public void setSortParallel(boolean sortParallel) { + setBoolean(SORT_PARALLEL, sortParallel); + } + + public int getSortSamples() { + return getInt(SORT_SAMPLES, AlgebricksConfig.SORT_SAMPLES); + } + + public void setSortSamples(int sortSamples) { + setInt(SORT_SAMPLES, sortSamples); + } + private void setInt(String property, int value) { properties.setProperty(property, Integer.toString(value)); } @@ -167,4 +186,16 @@ public class PhysicalOptimizationConfig { return Double.parseDouble(value); } + private void setBoolean(String property, boolean value) { + properties.setProperty(property, Boolean.toString(value)); + } + + private boolean getBoolean(String property, boolean defaultValue) { + String value = properties.getProperty(property); + if (value == null) { + return defaultValue; + } else { + return Boolean.parseBoolean(value); + } + } }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java index 8ada0ac..d6895e3 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/DotFormatGenerator.java @@ -29,12 +29,16 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; -import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.api.constraints.Constraint; +import org.apache.hyracks.api.constraints.expressions.ConstraintExpression; +import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression; +import org.apache.hyracks.api.constraints.expressions.PartitionCountExpression; +import org.apache.hyracks.api.constraints.expressions.PartitionLocationExpression; import org.apache.hyracks.api.dataflow.ActivityId; import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; import org.apache.hyracks.api.dataflow.IActivity; @@ -45,15 +49,13 @@ import org.apache.hyracks.api.job.JobSpecification; public class DotFormatGenerator { - private DotFormatGenerator() { - } + private final LogicalOperatorDotVisitor dotVisitor = new LogicalOperatorDotVisitor(); /** - * Generates DOT format for {@link JobActivityGraph} that can be visualized - * using any DOT format visualizer. + * Generates DOT format plan for {@link JobActivityGraph} that can be visualized using any DOT format visualizer. * * @param jobActivityGraph The job activity graph - * @return DOT format + * @return DOT format plan */ public static String generate(final JobActivityGraph jobActivityGraph) { final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("JobActivityGraph")); @@ -146,92 +148,74 @@ public class DotFormatGenerator { } /** - * Generates DOT format for {@link JobSpecification} that can be visualized - * using any DOT format visualizer. + * Generates DOT format plan for {@link JobSpecification} that can be visualized using any DOT format visualizer. * * @param jobSpecification The job specification - * @return DOT format + * @return DOT format plan */ public static String generate(final JobSpecification jobSpecification) { final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("JobSpecification")); final Map<ConnectorDescriptorId, IConnectorDescriptor> connectorMap = jobSpecification.getConnectorMap(); - final Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> cOp = + final Set<Constraint> constraints = jobSpecification.getUserConstraints(); + Map<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> cOp = jobSpecification.getConnectorOperatorMap(); - ConnectorDescriptorId connectorId; - IConnectorDescriptor connector; - IOperatorDescriptor leftOperator; - IOperatorDescriptor rightOperator; - DotFormatBuilder.Node sourceNode; - DotFormatBuilder.Node destinationNode; - String source; - String destination; - String edgeLabel; - for (Map.Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry : cOp - .entrySet()) { - connectorId = entry.getKey(); - connector = connectorMap.get(connectorId); - edgeLabel = connector.getClass().getName().substring(connector.getClass().getName().lastIndexOf(".") + 1); - edgeLabel += "-" + connectorId; - leftOperator = entry.getValue().getLeft().getLeft(); - rightOperator = entry.getValue().getRight().getLeft(); - source = leftOperator.getClass().getName() - .substring(leftOperator.getClass().getName().lastIndexOf(".") + 1); - sourceNode = - graphBuilder.createNode(DotFormatBuilder.StringValue.of(leftOperator.getOperatorId().toString()), - DotFormatBuilder.StringValue.of(leftOperator.toString() + "-" + source)); - destination = rightOperator.getClass().getName() - .substring(rightOperator.getClass().getName().lastIndexOf(".") + 1); - destinationNode = - graphBuilder.createNode(DotFormatBuilder.StringValue.of(rightOperator.getOperatorId().toString()), - DotFormatBuilder.StringValue.of(rightOperator.toString() + "-" + destination)); - graphBuilder.createEdge(sourceNode, destinationNode).setLabel(DotFormatBuilder.StringValue.of(edgeLabel)); - } - + cOp.forEach((connId, srcAndDest) -> addToGraph(graphBuilder, constraints, connectorMap, connId, srcAndDest)); return graphBuilder.getDotDocument(); } /** - * Generates DOT format for {@link ILogicalPlan} that can be visualized - * using any DOT format visualizer. + * Generates DOT format plan for {@link ILogicalPlan} that can be visualized using any DOT format visualizer. * * @param plan The logical plan - * @param dotVisitor The DOT visitor - * @return DOT format - * @throws AlgebricksException + * @param showDetails whether to show the details of the operator like physical properties + * @return DOT format plan + * @throws AlgebricksException When one operator throws an exception while visiting it. */ - public static String generate(ILogicalPlan plan, LogicalOperatorDotVisitor dotVisitor) throws AlgebricksException { - final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("Plan")); + public String generate(ILogicalPlan plan, boolean showDetails) throws AlgebricksException { ILogicalOperator root = plan.getRoots().get(0).getValue(); - generateNode(graphBuilder, root, dotVisitor, new HashSet<>()); + return generate(root, showDetails); + } + + /** + * Generates DOT format plan considering "startingOp" as the root operator. + * + * @param startingOp the starting operator + * @param showDetails whether to show the details of the operator like physical properties + * @return DOT format plan + * @throws AlgebricksException When one operator throws an exception while visiting it. + */ + public String generate(ILogicalOperator startingOp, boolean showDetails) throws AlgebricksException { + final DotFormatBuilder graphBuilder = new DotFormatBuilder(DotFormatBuilder.StringValue.of("Plan")); + generateNode(graphBuilder, startingOp, showDetails, new HashSet<>()); return graphBuilder.getDotDocument(); } - public static void generateNode(DotFormatBuilder dotBuilder, ILogicalOperator op, - LogicalOperatorDotVisitor dotVisitor, Set<ILogicalOperator> operatorsVisited) throws AlgebricksException { - DotFormatBuilder.StringValue destinationNodeLabel = formatStringOf(op, dotVisitor); + private void generateNode(DotFormatBuilder dotBuilder, ILogicalOperator op, boolean showDetails, + Set<ILogicalOperator> operatorsVisited) throws AlgebricksException { + DotFormatBuilder.StringValue destinationNodeLabel = formatStringOf(op, showDetails); DotFormatBuilder.Node destinationNode = dotBuilder .createNode(DotFormatBuilder.StringValue.of(Integer.toString(op.hashCode())), destinationNodeLabel); DotFormatBuilder.StringValue sourceNodeLabel; DotFormatBuilder.Node sourceNode; for (Mutable<ILogicalOperator> child : op.getInputs()) { - sourceNodeLabel = formatStringOf(child.getValue(), dotVisitor); + sourceNodeLabel = formatStringOf(child.getValue(), showDetails); sourceNode = dotBuilder.createNode( DotFormatBuilder.StringValue.of(Integer.toString(child.getValue().hashCode())), sourceNodeLabel); dotBuilder.createEdge(sourceNode, destinationNode); if (!operatorsVisited.contains(child.getValue())) { - generateNode(dotBuilder, child.getValue(), dotVisitor, operatorsVisited); + generateNode(dotBuilder, child.getValue(), showDetails, operatorsVisited); } } if (((AbstractLogicalOperator) op).hasNestedPlans()) { ILogicalOperator nestedOperator; for (ILogicalPlan nestedPlan : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) { nestedOperator = nestedPlan.getRoots().get(0).getValue(); - sourceNodeLabel = formatStringOf(nestedOperator, dotVisitor); + sourceNodeLabel = formatStringOf(nestedOperator, showDetails); sourceNode = dotBuilder.createNode( DotFormatBuilder.StringValue.of(Integer.toString(nestedOperator.hashCode())), sourceNodeLabel); dotBuilder.createEdge(sourceNode, destinationNode).setLabel(DotFormatBuilder.StringValue.of("subplan")); if (!operatorsVisited.contains(nestedOperator)) { - generateNode(dotBuilder, nestedOperator, dotVisitor, operatorsVisited); + generateNode(dotBuilder, nestedOperator, showDetails, operatorsVisited); } } } @@ -246,7 +230,7 @@ public class DotFormatGenerator { sourceNode = destinationNode; for (int i = 0; i < replicateOperator.getOutputs().size(); i++) { replicateOutput = replicateOperator.getOutputs().get(i).getValue(); - destinationNodeLabel = formatStringOf(replicateOutput, dotVisitor); + destinationNodeLabel = formatStringOf(replicateOutput, showDetails); destinationNode = dotBuilder.createNode( DotFormatBuilder.StringValue.of(Integer.toString(replicateOutput.hashCode())), destinationNodeLabel); @@ -261,16 +245,52 @@ public class DotFormatGenerator { operatorsVisited.add(op); } - private static DotFormatBuilder.StringValue formatStringOf(ILogicalOperator operator, - LogicalOperatorDotVisitor dotVisitor) throws AlgebricksException { - String formattedString = operator.accept(dotVisitor, null).trim(); - IPhysicalOperator physicalOperator = ((AbstractLogicalOperator) operator).getPhysicalOperator(); - if (physicalOperator != null) { - formattedString += "\\n" + physicalOperator.toString().trim() + " |" + operator.getExecutionMode() + "|"; - } else { - formattedString += "\\n|" + operator.getExecutionMode() + "|"; - } - + private DotFormatBuilder.StringValue formatStringOf(ILogicalOperator operator, boolean showDetails) + throws AlgebricksException { + String formattedString = operator.accept(dotVisitor, showDetails).trim(); return DotFormatBuilder.StringValue.of(formattedString); } + + private static void addToGraph(DotFormatBuilder graph, Set<Constraint> constraints, + Map<ConnectorDescriptorId, IConnectorDescriptor> connMap, ConnectorDescriptorId connId, + Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> srcAndDest) { + IConnectorDescriptor connector = connMap.get(connId); + String edgeLabel; + edgeLabel = connector.getClass().getName().substring(connector.getClass().getName().lastIndexOf(".") + 1); + edgeLabel += "-" + connId; + IOperatorDescriptor sourceOp = srcAndDest.getLeft().getLeft(); + IOperatorDescriptor destOp = srcAndDest.getRight().getLeft(); + StringBuilder source = new StringBuilder( + sourceOp.getClass().getName().substring(sourceOp.getClass().getName().lastIndexOf(".") + 1)); + StringBuilder destination = new StringBuilder( + destOp.getClass().getName().substring(destOp.getClass().getName().lastIndexOf(".") + 1)); + // constraints + for (Constraint constraint : constraints) { + LValueConstraintExpression lvalue = constraint.getLValue(); + if (lvalue.getTag() == ConstraintExpression.ExpressionTag.PARTITION_COUNT) { + PartitionCountExpression count = (PartitionCountExpression) lvalue; + if (count.getOperatorDescriptorId().equals(sourceOp.getOperatorId())) { + source.append("\n").append(constraint); + } + if (count.getOperatorDescriptorId().equals(destOp.getOperatorId())) { + destination.append("\n").append(constraint); + } + } else if (lvalue.getTag() == ConstraintExpression.ExpressionTag.PARTITION_LOCATION) { + PartitionLocationExpression location = (PartitionLocationExpression) lvalue; + if (location.getOperatorDescriptorId().equals(sourceOp.getOperatorId())) { + source.append("\n").append(constraint); + } + if (location.getOperatorDescriptorId().equals(destOp.getOperatorId())) { + destination.append("\n").append(constraint); + } + } + } + DotFormatBuilder.Node sourceNode = + graph.createNode(DotFormatBuilder.StringValue.of(sourceOp.getOperatorId().toString()), + DotFormatBuilder.StringValue.of(sourceOp.toString() + "-" + source)); + DotFormatBuilder.Node destinationNode = + graph.createNode(DotFormatBuilder.StringValue.of(destOp.getOperatorId().toString()), + DotFormatBuilder.StringValue.of(destOp.toString() + "-" + destination)); + graph.createEdge(sourceNode, destinationNode).setLabel(DotFormatBuilder.StringValue.of(edgeLabel)); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java index 2cb2d35..113d205 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java @@ -18,14 +18,20 @@ */ package org.apache.hyracks.algebricks.core.utils; +import static org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType.LOCAL_GROUPING_PROPERTY; +import static org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType.LOCAL_ORDER_PROPERTY; + import java.util.List; +import java.util.Map; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.common.utils.Triple; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator; @@ -35,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOper import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator; @@ -62,9 +69,14 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOpe import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain; +import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor; -public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String, Void> { +public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String, Boolean> { private final StringBuilder stringBuilder; @@ -82,161 +94,214 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String } @Override - public String visitAggregateOperator(AggregateOperator op, Void noArgs) throws AlgebricksException { + public String visitAggregateOperator(AggregateOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("aggregate ").append(str(op.getVariables())).append(" <- "); - pprintExprList(op.getExpressions()); + printExprList(op.getExpressions()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitRunningAggregateOperator(RunningAggregateOperator op, Void noArgs) throws AlgebricksException { + public String visitRunningAggregateOperator(RunningAggregateOperator op, Boolean showDetails) + throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("running-aggregate ").append(str(op.getVariables())).append(" <- "); - pprintExprList(op.getExpressions()); + printExprList(op.getExpressions()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void noArgs) throws AlgebricksException { + public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Boolean showDetails) { stringBuilder.setLength(0); stringBuilder.append("empty-tuple-source"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitGroupByOperator(GroupByOperator op, Void noArgs) throws AlgebricksException { + public String visitGroupByOperator(GroupByOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("group by").append(op.isGroupAll() ? " (all)" : "").append(" ("); - pprintVeList(op.getGroupByList()); + printVariableAndExprList(op.getGroupByList()); stringBuilder.append(") decor ("); - pprintVeList(op.getDecorList()); + printVariableAndExprList(op.getDecorList()); stringBuilder.append(")"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitDistinctOperator(DistinctOperator op, Void noArgs) throws AlgebricksException { + public String visitDistinctOperator(DistinctOperator op, Boolean showDetails) { stringBuilder.setLength(0); stringBuilder.append("distinct ("); - pprintExprList(op.getExpressions()); + printExprList(op.getExpressions()); stringBuilder.append(")"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitInnerJoinOperator(InnerJoinOperator op, Void noArgs) throws AlgebricksException { + public String visitInnerJoinOperator(InnerJoinOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("join (").append(op.getCondition().getValue().toString()).append(")"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void noArgs) throws AlgebricksException { + public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("left outer join (").append(op.getCondition().getValue().toString()).append(")"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void noArgs) throws AlgebricksException { + public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Boolean showDetails) + throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("nested tuple source"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitOrderOperator(OrderOperator op, Void noArgs) throws AlgebricksException { + public String visitOrderOperator(OrderOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("order "); for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) { if (op.getTopK() != -1) { stringBuilder.append("(topK: ").append(op.getTopK()).append(") "); } - String fst = getOrderString(p.first); - stringBuilder.append("(").append(fst).append(", ").append(p.second.getValue().toString()).append(") "); + stringBuilder.append("("); + switch (p.first.getKind()) { + case ASC: + stringBuilder.append("ASC"); + break; + case DESC: + stringBuilder.append("DESC"); + break; + default: + final Mutable<ILogicalExpression> expressionRef = p.first.getExpressionRef(); + stringBuilder.append(expressionRef == null ? "null" : expressionRef.toString()); + } + stringBuilder.append(", ").append(p.second.getValue().toString()).append(") "); } + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } - private String getOrderString(OrderOperator.IOrder first) { - switch (first.getKind()) { - case ASC: - return "ASC"; - case DESC: - return "DESC"; - default: - return first.getExpressionRef().toString(); - } - } - @Override - public String visitAssignOperator(AssignOperator op, Void noArgs) throws AlgebricksException { + public String visitAssignOperator(AssignOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("assign ").append(str(op.getVariables())).append(" <- "); - pprintExprList(op.getExpressions()); + printExprList(op.getExpressions()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitWriteOperator(WriteOperator op, Void noArgs) throws AlgebricksException { + public String visitWriteOperator(WriteOperator op, Boolean showDetails) { stringBuilder.setLength(0); stringBuilder.append("write "); - pprintExprList(op.getExpressions()); + printExprList(op.getExpressions()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitDistributeResultOperator(DistributeResultOperator op, Void noArgs) throws AlgebricksException { + public String visitDistributeResultOperator(DistributeResultOperator op, Boolean showDetails) { stringBuilder.setLength(0); stringBuilder.append("distribute result "); - pprintExprList(op.getExpressions()); + printExprList(op.getExpressions()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitWriteResultOperator(WriteResultOperator op, Void noArgs) throws AlgebricksException { + public String visitWriteResultOperator(WriteResultOperator op, Boolean showDetails) { stringBuilder.setLength(0); stringBuilder.append("load ").append(str(op.getDataSource())).append(" from ") .append(op.getPayloadExpression().getValue().toString()).append(" partitioned by "); - pprintExprList(op.getKeyExpressions()); + printExprList(op.getKeyExpressions()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitSelectOperator(SelectOperator op, Void noArgs) throws AlgebricksException { + public String visitSelectOperator(SelectOperator op, Boolean showDetails) { stringBuilder.setLength(0); stringBuilder.append("select (").append(op.getCondition().getValue().toString()).append(")"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitProjectOperator(ProjectOperator op, Void noArgs) throws AlgebricksException { + public String visitProjectOperator(ProjectOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("project ").append("(").append(op.getVariables()).append(")"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitSubplanOperator(SubplanOperator op, Void noArgs) throws AlgebricksException { + public String visitSubplanOperator(SubplanOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("subplan {}"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitUnionOperator(UnionAllOperator op, Void noArgs) throws AlgebricksException { + public String visitUnionOperator(UnionAllOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("union"); for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op.getVariableMappings()) { stringBuilder.append(" (").append(v.first).append(", ").append(v.second).append(", ").append(v.third) .append(")"); } + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitIntersectOperator(IntersectOperator op, Void noArgs) throws AlgebricksException { + public String visitIntersectOperator(IntersectOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("intersect ("); stringBuilder.append('['); @@ -261,154 +326,183 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String stringBuilder.append(']'); } stringBuilder.append("])"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitUnnestOperator(UnnestOperator op, Void noArgs) throws AlgebricksException { + public String visitUnnestOperator(UnnestOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("unnest ").append(op.getVariable()); if (op.getPositionalVariable() != null) { stringBuilder.append(" at ").append(op.getPositionalVariable()); } stringBuilder.append(" <- ").append(op.getExpressionRef().getValue().toString()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void noArgs) throws AlgebricksException { + public String visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Boolean showDetails) + throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("outer-unnest ").append(op.getVariable()); if (op.getPositionalVariable() != null) { stringBuilder.append(" at ").append(op.getPositionalVariable()); } stringBuilder.append(" <- ").append(op.getExpressionRef().getValue().toString()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitUnnestMapOperator(UnnestMapOperator op, Void noArgs) throws AlgebricksException { + public String visitUnnestMapOperator(UnnestMapOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); - printAbstractUnnestMapOperator(op, "unnest-map"); - appendSelectConditionInformation(stringBuilder, op.getSelectCondition()); - appendLimitInformation(stringBuilder, op.getOutputLimit()); + printAbstractUnnestMapOperator(op, "unnest-map", showDetails); + appendSelectConditionInformation(op.getSelectCondition()); + appendLimitInformation(op.getOutputLimit()); return stringBuilder.toString(); } @Override - public String visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Void noArgs) + public String visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); - printAbstractUnnestMapOperator(op, "left-outer-unnest-map"); + printAbstractUnnestMapOperator(op, "left-outer-unnest-map", showDetails); return stringBuilder.toString(); } - private void printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, String opSignature) { + private void printAbstractUnnestMapOperator(AbstractUnnestMapOperator op, String opSignature, boolean show) { stringBuilder.append(opSignature).append(" ").append(op.getVariables()).append(" <- ") .append(op.getExpressionRef().getValue().toString()); - appendFilterInformation(stringBuilder, op.getMinFilterVars(), op.getMaxFilterVars()); + appendFilterInformation(op.getMinFilterVars(), op.getMaxFilterVars()); + appendSchema(op, show); + appendAnnotations(op, show); + appendPhysicalOperatorInfo(op, show); } @Override - public String visitDataScanOperator(DataSourceScanOperator op, Void noArgs) throws AlgebricksException { + public String visitDataScanOperator(DataSourceScanOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("data-scan ").append(op.getProjectVariables()).append("<-").append(op.getVariables()) .append(" <- ").append(op.getDataSource()); - appendFilterInformation(stringBuilder, op.getMinFilterVars(), op.getMaxFilterVars()); - appendSelectConditionInformation(stringBuilder, op.getSelectCondition()); - appendLimitInformation(stringBuilder, op.getOutputLimit()); + appendFilterInformation(op.getMinFilterVars(), op.getMaxFilterVars()); + appendSelectConditionInformation(op.getSelectCondition()); + appendLimitInformation(op.getOutputLimit()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } - private void appendFilterInformation(StringBuilder plan, List<LogicalVariable> minFilterVars, - List<LogicalVariable> maxFilterVars) { + private void appendFilterInformation(List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars) { if (minFilterVars != null || maxFilterVars != null) { - plan.append(" with filter on"); + stringBuilder.append(" with filter on"); } if (minFilterVars != null) { - plan.append(" min:").append(minFilterVars); + stringBuilder.append(" min:").append(minFilterVars); } if (maxFilterVars != null) { - plan.append(" max:").append(maxFilterVars); + stringBuilder.append(" max:").append(maxFilterVars); } } - private Void appendSelectConditionInformation(StringBuilder plan, Mutable<ILogicalExpression> condition) - throws AlgebricksException { + private void appendSelectConditionInformation(Mutable<ILogicalExpression> condition) throws AlgebricksException { if (condition != null) { - plan.append(" condition:").append(condition.getValue().toString()); + stringBuilder.append(" condition:").append(condition.getValue().toString()); } - return null; } - private Void appendLimitInformation(StringBuilder plan, long outputLimit) throws AlgebricksException { + private void appendLimitInformation(long outputLimit) throws AlgebricksException { if (outputLimit >= 0) { - plan.append(" limit:").append(String.valueOf(outputLimit)); + stringBuilder.append(" limit:").append(String.valueOf(outputLimit)); } - return null; } @Override - public String visitLimitOperator(LimitOperator op, Void noArgs) throws AlgebricksException { + public String visitLimitOperator(LimitOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("limit ").append(op.getMaxObjects().getValue().toString()); ILogicalExpression offset = op.getOffset().getValue(); if (offset != null) { stringBuilder.append(", ").append(offset.toString()); } + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitExchangeOperator(ExchangeOperator op, Void noArgs) throws AlgebricksException { + public String visitExchangeOperator(ExchangeOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("exchange"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitScriptOperator(ScriptOperator op, Void noArgs) throws AlgebricksException { + public String visitScriptOperator(ScriptOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("script (in: ").append(op.getInputVariables()).append(") (out: ") .append(op.getOutputVariables()).append(")"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitReplicateOperator(ReplicateOperator op, Void noArgs) throws AlgebricksException { + public String visitReplicateOperator(ReplicateOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("replicate"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitSplitOperator(SplitOperator op, Void noArgs) throws AlgebricksException { + public String visitSplitOperator(SplitOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); Mutable<ILogicalExpression> branchingExpression = op.getBranchingExpression(); stringBuilder.append("split ").append(branchingExpression.getValue().toString()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitMaterializeOperator(MaterializeOperator op, Void noArgs) throws AlgebricksException { + public String visitMaterializeOperator(MaterializeOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("materialize"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void noArgs) - throws AlgebricksException { + public String visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Boolean showDetails) { stringBuilder.setLength(0); String header = getIndexOpString(op.getOperation()); stringBuilder.append(header).append(str(op.getDataSource())).append(" from record: ") .append(op.getPayloadExpression().getValue().toString()); if (op.getAdditionalNonFilteringExpressions() != null) { stringBuilder.append(", meta: "); - pprintExprList(op.getAdditionalNonFilteringExpressions()); + printExprList(op.getAdditionalNonFilteringExpressions()); } stringBuilder.append(" partitioned by "); - pprintExprList(op.getPrimaryKeyExpressions()); + printExprList(op.getPrimaryKeyExpressions()); if (op.getOperation() == Kind.UPSERT) { stringBuilder.append(" out: ([record-before-upsert:").append(op.getBeforeOpRecordVar()); if (op.getBeforeOpAdditionalNonFilteringVars() != null) { @@ -419,27 +513,32 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String if (op.isBulkload()) { stringBuilder.append(" [bulkload]"); } + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void noArgs) - throws AlgebricksException { + public String visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Boolean showDetails) { stringBuilder.setLength(0); String header = getIndexOpString(op.getOperation()); stringBuilder.append(header).append(op.getIndexName()).append(" on ") .append(str(op.getDataSourceIndex().getDataSource())).append(" from "); if (op.getOperation() == Kind.UPSERT) { stringBuilder.append(" replace:"); - pprintExprList(op.getPrevSecondaryKeyExprs()); + printExprList(op.getPrevSecondaryKeyExprs()); stringBuilder.append(" with:"); - pprintExprList(op.getSecondaryKeyExpressions()); + printExprList(op.getSecondaryKeyExpressions()); } else { - pprintExprList(op.getSecondaryKeyExpressions()); + printExprList(op.getSecondaryKeyExpressions()); } if (op.isBulkload()) { stringBuilder.append(" [bulkload]"); } + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @@ -452,60 +551,143 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String case UPSERT: return "upsert into "; } - return null; + return ""; } @Override - public String visitTokenizeOperator(TokenizeOperator op, Void noArgs) throws AlgebricksException { + public String visitTokenizeOperator(TokenizeOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append("tokenize ").append(str(op.getTokenizeVars())).append(" <- "); - pprintExprList(op.getSecondaryKeyExpressions()); + printExprList(op.getSecondaryKeyExpressions()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitSinkOperator(SinkOperator op, Void noArgs) throws AlgebricksException { + public String visitSinkOperator(SinkOperator op, Boolean showDetails) { stringBuilder.setLength(0); stringBuilder.append("sink"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } @Override - public String visitDelegateOperator(DelegateOperator op, Void noArgs) throws AlgebricksException { + public String visitDelegateOperator(DelegateOperator op, Boolean showDetails) throws AlgebricksException { stringBuilder.setLength(0); stringBuilder.append(op.toString()); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); + return stringBuilder.toString(); + } + + @Override + public String visitForwardOperator(ForwardOperator op, Boolean showDetails) throws AlgebricksException { + stringBuilder.setLength(0); + stringBuilder.append("forward(").append(op.getRangeMapExpression().getValue().toString()).append(")"); + appendSchema(op, showDetails); + appendAnnotations(op, showDetails); + appendPhysicalOperatorInfo(op, showDetails); return stringBuilder.toString(); } - private void pprintExprList(List<Mutable<ILogicalExpression>> expressions) { + private void printExprList(List<Mutable<ILogicalExpression>> expressions) { + stringBuilder.append("["); + expressions.forEach(exprRef -> stringBuilder.append(exprRef.getValue().toString()).append(", ")); + stringBuilder.append("]"); + } + + private void printVariableAndExprList(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> variableExprList) { stringBuilder.append("["); boolean first = true; - for (Mutable<ILogicalExpression> exprRef : expressions) { + for (Pair<LogicalVariable, Mutable<ILogicalExpression>> variableExpressionPair : variableExprList) { if (first) { first = false; } else { - stringBuilder.append(", "); + stringBuilder.append("; "); + } + if (variableExpressionPair.first != null) { + stringBuilder.append(variableExpressionPair.first).append(" := ").append(variableExpressionPair.second); + } else { + stringBuilder.append(variableExpressionPair.second.getValue().toString()); } - stringBuilder.append(exprRef.getValue().toString()); } stringBuilder.append("]"); } - private void pprintVeList(List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList) { - stringBuilder.append("["); - boolean fst = true; - for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : vePairList) { - if (fst) { - fst = false; - } else { - stringBuilder.append("; "); + private void appendSchema(AbstractLogicalOperator op, boolean show) { + if (show) { + stringBuilder.append("\\nSchema: "); + final List<LogicalVariable> schema = op.getSchema(); + stringBuilder.append(schema == null ? "null" : schema); + } + } + + private void appendAnnotations(AbstractLogicalOperator op, boolean show) { + if (show) { + final Map<String, Object> annotations = op.getAnnotations(); + if (!annotations.isEmpty()) { + stringBuilder.append("\\nAnnotations: ").append(annotations); } - if (ve.first != null) { - stringBuilder.append(ve.first).append(" := ").append(ve.second); - } else { - stringBuilder.append(ve.second.getValue().toString()); + } + } + + private void appendPhysicalOperatorInfo(AbstractLogicalOperator op, boolean show) { + IPhysicalOperator physicalOp = op.getPhysicalOperator(); + stringBuilder.append("\\n").append(physicalOp == null ? "null" : physicalOp.toString().trim()); + stringBuilder.append(", Exec: ").append(op.getExecutionMode()); + if (show) { + IPhysicalPropertiesVector properties = physicalOp == null ? null : physicalOp.getDeliveredProperties(); + List<ILocalStructuralProperty> localProp = properties == null ? null : properties.getLocalProperties(); + IPartitioningProperty partitioningProp = properties == null ? null : properties.getPartitioningProperty(); + if (localProp != null) { + stringBuilder.append("\\nProperties in each partition: ["); + for (ILocalStructuralProperty property : localProp) { + if (property == null) { + stringBuilder.append("null, "); + } else if (property.getPropertyType() == LOCAL_ORDER_PROPERTY) { + stringBuilder.append("ordered by "); + } else if (property.getPropertyType() == LOCAL_GROUPING_PROPERTY) { + stringBuilder.append("group by "); + } + stringBuilder.append(property).append(", "); + } + stringBuilder.append("]"); + } + + if (partitioningProp != null) { + stringBuilder.append("\\n").append(partitioningProp.getPartitioningType()).append(":"); + INodeDomain nodeDomain = partitioningProp.getNodeDomain(); + stringBuilder.append("\\n "); + if (nodeDomain != null && nodeDomain.cardinality() != null) { + stringBuilder.append(nodeDomain.cardinality()).append(" partitions. "); + } + switch (partitioningProp.getPartitioningType()) { + case BROADCAST: + stringBuilder.append("Data is broadcast to partitions."); + break; + case RANDOM: + stringBuilder.append("Data is randomly partitioned."); + break; + case ORDERED_PARTITIONED: + stringBuilder.append("Data is orderly partitioned via a range."); + break; + case UNORDERED_PARTITIONED: + stringBuilder.append("Data is hash partitioned."); + break; + case UNPARTITIONED: + stringBuilder.append("Data is in one place."); + } + if (nodeDomain instanceof DefaultNodeGroupDomain) { + DefaultNodeGroupDomain nd = (DefaultNodeGroupDomain) nodeDomain; + stringBuilder.append("\\n").append(nd); + } } } - stringBuilder.append("]"); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/80225e2c/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java index 6f7f86a..cdab2f4 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java @@ -21,9 +21,11 @@ package org.apache.hyracks.algebricks.rewriter.rules; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; @@ -40,28 +42,41 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.base.OperatorAnnotations; import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreSortedDistinctByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractPreclusteredGroupByPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.ForwardPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.OneToOneExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.ReplicatePOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.physical.SequentialMergeExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator; import org.apache.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor; @@ -90,13 +105,20 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil; import org.apache.hyracks.api.exceptions.SourceLocation; -import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap; +import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap; public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { private static final String HASH_MERGE = "hash_merge"; private static final String TRUE_CONSTANT = "true"; private PhysicalOptimizationConfig physicalOptimizationConfig; + private final FunctionIdentifier rangeMapFunction; + private final FunctionIdentifier localSamplingFun; + + public EnforceStructuralPropertiesRule(FunctionIdentifier rangeMapFunction, FunctionIdentifier localSamplingFun) { + this.rangeMapFunction = rangeMapFunction; + this.localSamplingFun = localSamplingFun; + } @Override public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) @@ -204,6 +226,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { boolean changed = false; AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); + optimizeUsingConstraintsAndEquivClasses(op); PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required, context); IPhysicalPropertiesVector[] reqdProperties = null; @@ -214,26 +237,19 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { // compute properties and figure out the domain INodeDomain childrenDomain = null; - { - int j = 0; - for (Mutable<ILogicalOperator> childRef : op.getInputs()) { - AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue(); - // recursive call - if (physOptimizeOp(childRef, reqdProperties[j], nestedPlan, context)) { - changed = true; - } - child.computeDeliveredPhysicalProperties(context); - IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties(); - if (childrenDomain == null) { - childrenDomain = delivered.getPartitioningProperty().getNodeDomain(); - } else { - INodeDomain dom2 = delivered.getPartitioningProperty().getNodeDomain(); - if (!childrenDomain.sameAs(dom2)) { - childrenDomain = context.getComputationNodeDomain(); - } - } - j++; + int j = 0; + for (Mutable<ILogicalOperator> childRef : op.getInputs()) { + AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue(); + changed |= physOptimizeOp(childRef, reqdProperties[j], nestedPlan, context); + child.computeDeliveredPhysicalProperties(context); + IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties(); + INodeDomain childDomain = delivered.getPartitioningProperty().getNodeDomain(); + if (childrenDomain == null) { + childrenDomain = delivered.getPartitioningProperty().getNodeDomain(); + } else if (!childrenDomain.sameAs(childDomain)) { + childrenDomain = context.getComputationNodeDomain(); } + j++; } if (reqdProperties != null) { @@ -252,7 +268,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { int startChildIndex = getStartChildIndex(op, pr, nestedPlan, context); IPartitioningProperty firstDeliveredPartitioning = null; // Enforce data properties in a top-down manner. - for (int j = 0; j < op.getInputs().size(); j++) { + for (j = 0; j < op.getInputs().size(); j++) { // Starts from a partitioning-compatible child if any to loop over all children. int childIndex = (j + startChildIndex) % op.getInputs().size(); IPhysicalPropertiesVector requiredProperty = reqdProperties[childIndex]; @@ -555,6 +571,17 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { return new MutableObject<ILogicalOperator>(oo); } + /** + * Adds exchange operators (connectors) between {@code op} & its child at index {@code childIdx}. + * @param op the parent operator that is requiring a specific kind of connector at its child + * @param i the child index where we want to have the connector + * @param pp the required partitioning property at that child (i.e. the required connector) + * @param required the physical properties required at that child (partitioning + local properties) + * @param deliveredByChild the physical properties delivered by that child (partitioning + local properties) + * @param domain the destination domain of nodes that we want the connector to connect to + * @param context {@link IOptimizationContext} + * @throws AlgebricksException + */ private void addPartitioningEnforcers(ILogicalOperator op, int i, IPartitioningProperty pp, IPhysicalPropertiesVector required, IPhysicalPropertiesVector deliveredByChild, INodeDomain domain, IOptimizationContext context) throws AlgebricksException { @@ -562,52 +589,15 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { IPhysicalOperator pop; switch (pp.getPartitioningType()) { case UNPARTITIONED: { - List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild); - if (ordCols.isEmpty()) { - pop = new RandomMergeExchangePOperator(); - } else { - if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) { - IRangeMap rangeMap = - (IRangeMap) op.getAnnotations().get(OperatorAnnotations.USE_RANGE_CONNECTOR); - pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap); - } else { - OrderColumn[] sortColumns = new OrderColumn[ordCols.size()]; - sortColumns = ordCols.toArray(sortColumns); - pop = new SortMergeExchangePOperator(sortColumns); - } - } + pop = createMergingConnector(op, domain, deliveredByChild); break; } case UNORDERED_PARTITIONED: { - List<LogicalVariable> varList = new ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet()); - String hashMergeHint = (String) context.getMetadataProvider().getConfig().get(HASH_MERGE); - if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) { - pop = new HashPartitionExchangePOperator(varList, domain); - break; - } - List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties(); - List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties(); - boolean propWasSet = false; - pop = null; - if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) { - AbstractLogicalOperator c = (AbstractLogicalOperator) op.getInputs().get(i).getValue(); - Map<LogicalVariable, EquivalenceClass> ecs = context.getEquivalenceClassMap(c); - List<FunctionalDependency> fds = context.getFDList(c); - if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) { - List<OrderColumn> orderColumns = - getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals); - pop = new HashPartitionMergeExchangePOperator(orderColumns, varList, domain); - propWasSet = true; - } - } - if (!propWasSet) { - pop = new HashPartitionExchangePOperator(varList, domain); - } + pop = createHashConnector(context, deliveredByChild, domain, required, pp, i, op); break; } case ORDERED_PARTITIONED: { - pop = new RangePartitionExchangePOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), - domain, null); + pop = createRangePartitionerConnector((AbstractLogicalOperator) op, domain, pp, i, context); break; } case BROADCAST: { @@ -640,6 +630,264 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule { } } + private IPhysicalOperator createMergingConnector(ILogicalOperator parentOp, INodeDomain domain, + IPhysicalPropertiesVector deliveredByChild) { + IPhysicalOperator mergingConnector; + List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild); + if (ordCols.isEmpty()) { + IPartitioningProperty partitioningDeliveredByChild = deliveredByChild.getPartitioningProperty(); + if (partitioningDeliveredByChild.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED) { + mergingConnector = new SequentialMergeExchangePOperator(); + } else { + mergingConnector = new RandomMergeExchangePOperator(); + } + } else { + if (parentOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)) { + RangeMap rangeMap = (RangeMap) parentOp.getAnnotations().get(OperatorAnnotations.USE_STATIC_RANGE); + mergingConnector = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap); + } else { + OrderColumn[] sortColumns = new OrderColumn[ordCols.size()]; + sortColumns = ordCols.toArray(sortColumns); + mergingConnector = new SortMergeExchangePOperator(sortColumns); + } + } + return mergingConnector; + } + + private IPhysicalOperator createHashConnector(IOptimizationContext ctx, IPhysicalPropertiesVector deliveredByChild, + INodeDomain domain, IPhysicalPropertiesVector requiredAtChild, IPartitioningProperty rqdPartitioning, + int childIndex, ILogicalOperator parentOp) { + IPhysicalOperator hashConnector; + List<LogicalVariable> vars = new ArrayList<>(((UnorderedPartitionedProperty) rqdPartitioning).getColumnSet()); + String hashMergeHint = (String) ctx.getMetadataProvider().getConfig().get(HASH_MERGE); + if (hashMergeHint == null || !hashMergeHint.equalsIgnoreCase(TRUE_CONSTANT)) { + hashConnector = new HashPartitionExchangePOperator(vars, domain); + return hashConnector; + } + List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties(); + List<ILocalStructuralProperty> reqdLocals = requiredAtChild.getLocalProperties(); + boolean propWasSet = false; + hashConnector = null; + if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) { + AbstractLogicalOperator c = (AbstractLogicalOperator) parentOp.getInputs().get(childIndex).getValue(); + Map<LogicalVariable, EquivalenceClass> ecs = ctx.getEquivalenceClassMap(c); + List<FunctionalDependency> fds = ctx.getFDList(c); + if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) { + List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals, cldLocals); + hashConnector = new HashPartitionMergeExchangePOperator(orderColumns, vars, domain); + propWasSet = true; + } + } + if (!propWasSet) { + hashConnector = new HashPartitionExchangePOperator(vars, domain); + } + return hashConnector; + } + + /** + * Creates a range-based exchange operator. + * @param parentOp the operator requiring range-based partitioner to have input tuples repartitioned using a range + * @param domain the target node domain of the range-based partitioner + * @param requiredPartitioning {@see OrderedPartitionedProperty} + * @param childIndex the index of the child at which the required partitioning is needed + * @param ctx optimization context + * @return a range-based exchange operator + * @throws AlgebricksException + */ + private IPhysicalOperator createRangePartitionerConnector(AbstractLogicalOperator parentOp, INodeDomain domain, + IPartitioningProperty requiredPartitioning, int childIndex, IOptimizationContext ctx) + throws AlgebricksException { + // options for range partitioning: 1. static range map, 2. dynamic range map computed at run time + List<OrderColumn> partitioningColumns = ((OrderedPartitionedProperty) requiredPartitioning).getOrderColumns(); + if (parentOp.getAnnotations().containsKey(OperatorAnnotations.USE_STATIC_RANGE)) { + // TODO(ali): static range map implementation should be fixed to require ORDERED_PARTITION and come here. + RangeMap rangeMap = (RangeMap) parentOp.getAnnotations().get(OperatorAnnotations.USE_STATIC_RANGE); + return new RangePartitionExchangePOperator(partitioningColumns, domain, rangeMap); + } else { + return createDynamicRangePartitionExchangePOperator(parentOp, ctx, domain, partitioningColumns, childIndex); + } + } + + private IPhysicalOperator createDynamicRangePartitionExchangePOperator(AbstractLogicalOperator parentOp, + IOptimizationContext ctx, INodeDomain targetDomain, List<OrderColumn> partitioningColumns, int childIndex) + throws AlgebricksException { + SourceLocation sourceLoc = parentOp.getSourceLocation(); + // #1. create the replicate operator and add it above the source op feeding parent operator + ReplicateOperator replicateOp = createReplicateOperator(parentOp.getInputs().get(childIndex), ctx, sourceLoc); + + // these two exchange ops are needed so that the parents of replicate stay the same during later optimizations. + // This is because replicate operator has references to its parents. If any later optimizations add new parents, + // then replicate would still point to the old ones. + MutableObject<ILogicalOperator> replicateOpRef = new MutableObject<>(replicateOp); + ExchangeOperator exchToLocalAgg = createOneToOneExchangeOp(replicateOpRef, ctx); + ExchangeOperator exchToForward = createOneToOneExchangeOp(replicateOpRef, ctx); + MutableObject<ILogicalOperator> exchToLocalAggRef = new MutableObject<>(exchToLocalAgg); + MutableObject<ILogicalOperator> exchToForwardRef = new MutableObject<>(exchToForward); + + // add the exchange--to-forward at output 0, the exchange-to-local-aggregate at output 1 + replicateOp.getOutputs().add(exchToForwardRef); + replicateOp.getOutputs().add(exchToLocalAggRef); + // materialize the data to be able to re-read the data again after sampling is done + replicateOp.getOutputMaterializationFlags()[0] = true; + + // #2. create the aggregate operators and their sampling functions + // $$samplingResultVar = local_samplingFun($$partitioning_column) + // $$rangeMapResultVar = global_rangeMapFun($$samplingResultVar) + List<LogicalVariable> samplingResultVar = new ArrayList<>(1); + List<LogicalVariable> rangeMapResultVar = new ArrayList<>(1); + List<Mutable<ILogicalExpression>> samplingFun = new ArrayList<>(1); + List<Mutable<ILogicalExpression>> rangeMapFun = new ArrayList<>(1); + + createAggregateFunction(ctx, samplingResultVar, samplingFun, rangeMapResultVar, rangeMapFun, + targetDomain.cardinality(), partitioningColumns, sourceLoc); + + AggregateOperator localAggOp = + createAggregate(samplingResultVar, false, samplingFun, exchToLocalAggRef, ctx, sourceLoc); + MutableObject<ILogicalOperator> localAgg = new MutableObject<>(localAggOp); + AggregateOperator globalAggOp = createAggregate(rangeMapResultVar, true, rangeMapFun, localAgg, ctx, sourceLoc); + MutableObject<ILogicalOperator> globalAgg = new MutableObject<>(globalAggOp); + + // #3. create the forward operator + String rangeMapKey = UUID.randomUUID().toString(); + LogicalVariable rangeMapVar = rangeMapResultVar.get(0); + ForwardOperator forward = createForward(rangeMapKey, rangeMapVar, exchToForwardRef, globalAgg, ctx, sourceLoc); + MutableObject<ILogicalOperator> forwardRef = new MutableObject<>(forward); + + // replace the old input of parentOp requiring the range partitioning with the new forward op + parentOp.getInputs().set(childIndex, forwardRef); + parentOp.recomputeSchema(); + ctx.computeAndSetTypeEnvironmentForOperator(parentOp); + + return new RangePartitionExchangePOperator(partitioningColumns, rangeMapKey, targetDomain); + } + + private static ReplicateOperator createReplicateOperator(Mutable<ILogicalOperator> inputOperator, + IOptimizationContext context, SourceLocation sourceLocation) throws AlgebricksException { + ReplicateOperator replicateOperator = new ReplicateOperator(2); + replicateOperator.setPhysicalOperator(new ReplicatePOperator()); + replicateOperator.setSourceLocation(sourceLocation); + replicateOperator.getInputs().add(inputOperator); + OperatorManipulationUtil.setOperatorMode(replicateOperator); + replicateOperator.recomputeSchema(); + context.computeAndSetTypeEnvironmentForOperator(replicateOperator); + return replicateOperator; + } + + /** + * Creates the sampling expressions and embeds them in {@code localAggFunctions} & {@code globalAggFunctions}. Also, + * creates the variables which will hold the result of each one. + * {@code localResultVariables},{@code localAggFunctions},{@code globalResultVariables} & {@code globalAggFunctions} + * will be used when creating the corresponding aggregate operators. + * @param context used to get new variables which will be assigned the samples & the range map + * @param localResultVariables the variable to which the stats (e.g. samples) info is assigned + * @param localAggFunctions the local sampling expression is added to this list + * @param globalResultVariables the variable to which the range map is assigned + * @param globalAggFunctions the expression generating a range map is added to this list + * @param numPartitions passed to the expression generating a range map to know how many split points are needed + * @param partFields the fields based on which the partitioner partitions the tuples, also sampled fields + * @param sourceLocation source location + */ + private void createAggregateFunction(IOptimizationContext context, List<LogicalVariable> localResultVariables, + List<Mutable<ILogicalExpression>> localAggFunctions, List<LogicalVariable> globalResultVariables, + List<Mutable<ILogicalExpression>> globalAggFunctions, int numPartitions, List<OrderColumn> partFields, + SourceLocation sourceLocation) { + // prepare the arguments of the local sampling function: sampled fields + List<Mutable<ILogicalExpression>> sampledFields = new ArrayList<>(partFields.size()); + partFields.forEach(f -> { + AbstractLogicalExpression sampledField = new VariableReferenceExpression(f.getColumn()); + sampledField.setSourceLocation(sourceLocation); + sampledFields.add(new MutableObject<>(sampledField)); + }); + + // local info + IFunctionInfo samplingFun = context.getMetadataProvider().lookupFunction(localSamplingFun); + AbstractFunctionCallExpression samplingExp = + new AggregateFunctionCallExpression(samplingFun, false, sampledFields); + samplingExp.setSourceLocation(sourceLocation); + LogicalVariable samplingResultVar = context.newVar(); + localResultVariables.add(samplingResultVar); + localAggFunctions.add(new MutableObject<>(samplingExp)); + Object[] samplingParam = { context.getPhysicalOptimizationConfig().getSortSamples() }; + samplingExp.setOpaqueParameters(samplingParam); + + // prepare the argument of the global range map generator function: the result of the local function + List<Mutable<ILogicalExpression>> arg = new ArrayList<>(1); + AbstractLogicalExpression samplingResultVarExp = new VariableReferenceExpression(samplingResultVar); + samplingResultVarExp.setSourceLocation(sourceLocation); + arg.add(new MutableObject<>(samplingResultVarExp)); + + // global info + IFunctionInfo rangeMapFun = context.getMetadataProvider().lookupFunction(rangeMapFunction); + AbstractFunctionCallExpression rangeMapExp = new AggregateFunctionCallExpression(rangeMapFun, true, arg); + rangeMapExp.setSourceLocation(sourceLocation); + globalResultVariables.add(context.newVar()); + globalAggFunctions.add(new MutableObject<>(rangeMapExp)); + + int i = 0; + boolean[] ascendingFlags = new boolean[partFields.size()]; + for (OrderColumn column : partFields) { + ascendingFlags[i] = column.getOrder() == OrderOperator.IOrder.OrderKind.ASC; + i++; + } + rangeMapExp.setOpaqueParameters(new Object[] { numPartitions, ascendingFlags }); + } + + /** + * Creates an aggregate operator. $$resultVariables = expressions() + * @param resultVariables the variables which stores the result of the aggregation + * @param isGlobal whether the aggregate operator is a global or local one + * @param expressions the aggregation functions desired + * @param inputOperator the input op that is feeding the aggregate operator + * @param context optimization context + * @param sourceLocation source location + * @return an aggregate operator with the specified information + * @throws AlgebricksException when there is error setting the type environment of the newly created aggregate op + */ + private static AggregateOperator createAggregate(List<LogicalVariable> resultVariables, boolean isGlobal, + List<Mutable<ILogicalExpression>> expressions, MutableObject<ILogicalOperator> inputOperator, + IOptimizationContext context, SourceLocation sourceLocation) throws AlgebricksException { + AggregateOperator aggregateOperator = new AggregateOperator(resultVariables, expressions); + aggregateOperator.setPhysicalOperator(new AggregatePOperator()); + aggregateOperator.setSourceLocation(sourceLocation); + aggregateOperator.getInputs().add(inputOperator); + aggregateOperator.setGlobal(isGlobal); + if (!isGlobal) { + aggregateOperator.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL); + } else { + aggregateOperator.setExecutionMode(AbstractLogicalOperator.ExecutionMode.UNPARTITIONED); + } + aggregateOperator.recomputeSchema(); + context.computeAndSetTypeEnvironmentForOperator(aggregateOperator); + return aggregateOperator; + } + + private static ExchangeOperator createOneToOneExchangeOp(MutableObject<ILogicalOperator> inputOperator, + IOptimizationContext context) throws AlgebricksException { + ExchangeOperator exchangeOperator = new ExchangeOperator(); + exchangeOperator.setPhysicalOperator(new OneToOneExchangePOperator()); + exchangeOperator.getInputs().add(inputOperator); + exchangeOperator.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED); + exchangeOperator.recomputeSchema(); + context.computeAndSetTypeEnvironmentForOperator(exchangeOperator); + return exchangeOperator; + } + + private static ForwardOperator createForward(String rangeMapKey, LogicalVariable rangeMapVariable, + MutableObject<ILogicalOperator> exchangeOpFromReplicate, MutableObject<ILogicalOperator> globalAggInput, + IOptimizationContext context, SourceLocation sourceLocation) throws AlgebricksException { + AbstractLogicalExpression rangeMapExpression = new VariableReferenceExpression(rangeMapVariable); + rangeMapExpression.setSourceLocation(sourceLocation); + ForwardOperator forwardOperator = new ForwardOperator(rangeMapKey, new MutableObject<>(rangeMapExpression)); + forwardOperator.setSourceLocation(sourceLocation); + forwardOperator.setPhysicalOperator(new ForwardPOperator()); + forwardOperator.getInputs().add(exchangeOpFromReplicate); + forwardOperator.getInputs().add(globalAggInput); + OperatorManipulationUtil.setOperatorMode(forwardOperator); + forwardOperator.recomputeSchema(); + context.computeAndSetTypeEnvironmentForOperator(forwardOperator); + return forwardOperator; + } + private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) { for (ILocalStructuralProperty lsp : cldLocals) { if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {