http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java new file mode 100644 index 0000000..e85f289 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java @@ -0,0 +1,589 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dag; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; +import org.apache.flink.api.common.operators.base.DeltaIterationBase; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.typeinfo.NothingTypeInfo; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor; +import org.apache.flink.optimizer.costs.CostEstimator; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.InterestingProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.PartitioningProperty; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.operators.OperatorDescriptorDual; +import org.apache.flink.optimizer.operators.SolutionSetDeltaOperator; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.NamedChannel; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.optimizer.plan.SolutionSetPlanNode; +import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; +import org.apache.flink.optimizer.plan.WorksetPlanNode; +import org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport; +import org.apache.flink.optimizer.util.NoOpBinaryUdfOp; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.runtime.operators.util.LocalStrategy; +import org.apache.flink.types.Nothing; +import org.apache.flink.util.Visitor; + +/** + * A node in the optimizer's program representation for a workset iteration. + */ +public class WorksetIterationNode extends TwoInputNode implements IterationNode { + + private static final int DEFAULT_COST_WEIGHT = 20; + + + private final FieldList solutionSetKeyFields; + + private final GlobalProperties partitionedProperties; + + private final List<OperatorDescriptorDual> dataProperties; + + private SolutionSetNode solutionSetNode; + + private WorksetNode worksetNode; + + private OptimizerNode solutionSetDelta; + + private OptimizerNode nextWorkset; + + private DagConnection solutionSetDeltaRootConnection; + + private DagConnection nextWorksetRootConnection; + + private SingleRootJoiner singleRoot; + + private boolean solutionDeltaImmediatelyAfterSolutionJoin; + + private final int costWeight; + + // -------------------------------------------------------------------------------------------- + + /** + * Creates a new node with a single input for the optimizer plan. + * + * @param iteration The iteration operator that the node represents. + */ + public WorksetIterationNode(DeltaIterationBase<?, ?> iteration) { + super(iteration); + + final int[] ssKeys = iteration.getSolutionSetKeyFields(); + if (ssKeys == null || ssKeys.length == 0) { + throw new CompilerException("Invalid WorksetIteration: No key fields defined for the solution set."); + } + this.solutionSetKeyFields = new FieldList(ssKeys); + this.partitionedProperties = new GlobalProperties(); + this.partitionedProperties.setHashPartitioned(this.solutionSetKeyFields); + + int weight = iteration.getMaximumNumberOfIterations() > 0 ? + iteration.getMaximumNumberOfIterations() : DEFAULT_COST_WEIGHT; + + if (weight > OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT) { + weight = OptimizerNode.MAX_DYNAMIC_PATH_COST_WEIGHT; + } + this.costWeight = weight; + + this.dataProperties = Collections.<OperatorDescriptorDual>singletonList(new WorksetOpDescriptor(this.solutionSetKeyFields)); + } + + // -------------------------------------------------------------------------------------------- + + public DeltaIterationBase<?, ?> getIterationContract() { + return (DeltaIterationBase<?, ?>) getOperator(); + } + + public SolutionSetNode getSolutionSetNode() { + return this.solutionSetNode; + } + + public WorksetNode getWorksetNode() { + return this.worksetNode; + } + + public OptimizerNode getNextWorkset() { + return this.nextWorkset; + } + + public OptimizerNode getSolutionSetDelta() { + return this.solutionSetDelta; + } + + public void setPartialSolution(SolutionSetNode solutionSetNode, WorksetNode worksetNode) { + if (this.solutionSetNode != null || this.worksetNode != null) { + throw new IllegalStateException("Error: Initializing WorksetIterationNode multiple times."); + } + this.solutionSetNode = solutionSetNode; + this.worksetNode = worksetNode; + } + + public void setNextPartialSolution(OptimizerNode solutionSetDelta, OptimizerNode nextWorkset, + ExecutionMode executionMode) { + + // check whether the next partial solution is itself the join with + // the partial solution (so we can potentially do direct updates) + if (solutionSetDelta instanceof TwoInputNode) { + TwoInputNode solutionDeltaTwoInput = (TwoInputNode) solutionSetDelta; + if (solutionDeltaTwoInput.getFirstPredecessorNode() == this.solutionSetNode || + solutionDeltaTwoInput.getSecondPredecessorNode() == this.solutionSetNode) + { + this.solutionDeltaImmediatelyAfterSolutionJoin = true; + } + } + + // there needs to be at least one node in the workset path, so + // if the next workset is equal to the workset, we need to inject a no-op node + if (nextWorkset == worksetNode || nextWorkset instanceof BinaryUnionNode) { + NoOpNode noop = new NoOpNode(); + noop.setDegreeOfParallelism(getParallelism()); + + DagConnection noOpConn = new DagConnection(nextWorkset, noop, executionMode); + noop.setIncomingConnection(noOpConn); + nextWorkset.addOutgoingConnection(noOpConn); + + nextWorkset = noop; + } + + // attach an extra node to the solution set delta for the cases where we need to repartition + UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(), + new SolutionSetDeltaOperator(getSolutionSetKeyFields())); + solutionSetDeltaUpdateAux.setDegreeOfParallelism(getParallelism()); + + DagConnection conn = new DagConnection(solutionSetDelta, solutionSetDeltaUpdateAux, executionMode); + solutionSetDeltaUpdateAux.setIncomingConnection(conn); + solutionSetDelta.addOutgoingConnection(conn); + + this.solutionSetDelta = solutionSetDeltaUpdateAux; + this.nextWorkset = nextWorkset; + + this.singleRoot = new SingleRootJoiner(); + this.solutionSetDeltaRootConnection = new DagConnection(solutionSetDeltaUpdateAux, + this.singleRoot, executionMode); + + this.nextWorksetRootConnection = new DagConnection(nextWorkset, this.singleRoot, executionMode); + this.singleRoot.setInputs(this.solutionSetDeltaRootConnection, this.nextWorksetRootConnection); + + solutionSetDeltaUpdateAux.addOutgoingConnection(this.solutionSetDeltaRootConnection); + nextWorkset.addOutgoingConnection(this.nextWorksetRootConnection); + } + + public int getCostWeight() { + return this.costWeight; + } + + public TwoInputNode getSingleRootOfStepFunction() { + return this.singleRoot; + } + + public FieldList getSolutionSetKeyFields() { + return this.solutionSetKeyFields; + } + + public OptimizerNode getInitialSolutionSetPredecessorNode() { + return getFirstPredecessorNode(); + } + + public OptimizerNode getInitialWorksetPredecessorNode() { + return getSecondPredecessorNode(); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public String getName() { + return "Workset Iteration"; + } + + @Override + public SemanticProperties getSemanticProperties() { + return new EmptySemanticProperties(); + } + + protected void readStubAnnotations() {} + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + this.estimatedOutputSize = getFirstPredecessorNode().getEstimatedOutputSize(); + this.estimatedNumRecords = getFirstPredecessorNode().getEstimatedNumRecords(); + } + + // -------------------------------------------------------------------------------------------- + // Properties and Optimization + // -------------------------------------------------------------------------------------------- + + @Override + protected List<OperatorDescriptorDual> getPossibleProperties() { + return this.dataProperties; + } + + @Override + public void computeInterestingPropertiesForInputs(CostEstimator estimator) { + // our own solution (the solution set) is always partitioned and this cannot be adjusted + // depending on what the successor to the workset iteration requests. for that reason, + // we ignore incoming interesting properties. + + // in addition, we need to make 2 interesting property passes, because the root of the step function + // that computes the next workset needs the interesting properties as generated by the + // workset source of the step function. the second pass concerns only the workset path. + // as initial interesting properties, we have the trivial ones for the step function, + // and partitioned on the solution set key for the solution set delta + + RequestedGlobalProperties partitionedProperties = new RequestedGlobalProperties(); + partitionedProperties.setHashPartitioned(this.solutionSetKeyFields); + InterestingProperties partitionedIP = new InterestingProperties(); + partitionedIP.addGlobalProperties(partitionedProperties); + partitionedIP.addLocalProperties(new RequestedLocalProperties()); + + this.nextWorksetRootConnection.setInterestingProperties(new InterestingProperties()); + this.solutionSetDeltaRootConnection.setInterestingProperties(partitionedIP.clone()); + + InterestingPropertyVisitor ipv = new InterestingPropertyVisitor(estimator); + this.nextWorkset.accept(ipv); + this.solutionSetDelta.accept(ipv); + + // take the interesting properties of the partial solution and add them to the root interesting properties + InterestingProperties worksetIntProps = this.worksetNode.getInterestingProperties(); + InterestingProperties intProps = new InterestingProperties(); + intProps.getGlobalProperties().addAll(worksetIntProps.getGlobalProperties()); + intProps.getLocalProperties().addAll(worksetIntProps.getLocalProperties()); + + // clear all interesting properties to prepare the second traversal + this.nextWorksetRootConnection.clearInterestingProperties(); + this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE); + + // 2nd pass + this.nextWorksetRootConnection.setInterestingProperties(intProps); + this.nextWorkset.accept(ipv); + + // now add the interesting properties of the workset to the workset input + final InterestingProperties inProps = this.worksetNode.getInterestingProperties().clone(); + inProps.addGlobalProperties(new RequestedGlobalProperties()); + inProps.addLocalProperties(new RequestedLocalProperties()); + this.input2.setInterestingProperties(inProps); + + // the partial solution must be hash partitioned, so it has only that as interesting properties + this.input1.setInterestingProperties(partitionedIP); + } + + @Override + public void clearInterestingProperties() { + super.clearInterestingProperties(); + + this.nextWorksetRootConnection.clearInterestingProperties(); + this.solutionSetDeltaRootConnection.clearInterestingProperties(); + + this.nextWorkset.accept(InterestingPropertiesClearer.INSTANCE); + this.solutionSetDelta.accept(InterestingPropertiesClearer.INSTANCE); + } + + @Override + protected void instantiate(OperatorDescriptorDual operator, Channel solutionSetIn, Channel worksetIn, + List<Set<? extends NamedChannel>> broadcastPlanChannels, List<PlanNode> target, CostEstimator estimator, + RequestedGlobalProperties globPropsReqSolutionSet, RequestedGlobalProperties globPropsReqWorkset, + RequestedLocalProperties locPropsReqSolutionSet, RequestedLocalProperties locPropsReqWorkset) + { + // check for pipeline breaking using hash join with build on the solution set side + placePipelineBreakersIfNecessary(DriverStrategy.HYBRIDHASH_BUILD_FIRST, solutionSetIn, worksetIn); + + // NOTES ON THE ENUMERATION OF THE STEP FUNCTION PLANS: + // Whenever we instantiate the iteration, we enumerate new candidates for the step function. + // That way, we make sure we have an appropriate plan for each candidate for the initial partial solution, + // we have a fitting candidate for the step function (often, work is pushed out of the step function). + // Among the candidates of the step function, we keep only those that meet the requested properties of the + // current candidate initial partial solution. That makes sure these properties exist at the beginning of + // every iteration. + + // 1) Because we enumerate multiple times, we may need to clean the cached plans + // before starting another enumeration + this.nextWorkset.accept(PlanCacheCleaner.INSTANCE); + this.solutionSetDelta.accept(PlanCacheCleaner.INSTANCE); + + // 2) Give the partial solution the properties of the current candidate for the initial partial solution + // This concerns currently only the workset. + this.worksetNode.setCandidateProperties(worksetIn.getGlobalProperties(), worksetIn.getLocalProperties(), worksetIn); + this.solutionSetNode.setCandidateProperties(this.partitionedProperties, new LocalProperties(), solutionSetIn); + + final SolutionSetPlanNode sspn = this.solutionSetNode.getCurrentSolutionSetPlanNode(); + final WorksetPlanNode wspn = this.worksetNode.getCurrentWorksetPlanNode(); + + // 3) Get the alternative plans + List<PlanNode> solutionSetDeltaCandidates = this.solutionSetDelta.getAlternativePlans(estimator); + List<PlanNode> worksetCandidates = this.nextWorkset.getAlternativePlans(estimator); + + // 4) Throw away all that are not compatible with the properties currently requested to the + // initial partial solution + + // Make sure that the workset candidates fulfill the input requirements + { + List<PlanNode> newCandidates = new ArrayList<PlanNode>(); + + for (Iterator<PlanNode> planDeleter = worksetCandidates.iterator(); planDeleter.hasNext(); ) { + PlanNode candidate = planDeleter.next(); + + GlobalProperties atEndGlobal = candidate.getGlobalProperties(); + LocalProperties atEndLocal = candidate.getLocalProperties(); + + FeedbackPropertiesMeetRequirementsReport report = candidate.checkPartialSolutionPropertiesMet(wspn, + atEndGlobal, atEndLocal); + + if (report == FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) { + ; // depends only through broadcast variable on the workset solution + } + else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) { + // attach a no-op node through which we create the properties of the original input + Channel toNoOp = new Channel(candidate); + globPropsReqWorkset.parameterizeChannel(toNoOp, false, + nextWorksetRootConnection.getDataExchangeMode(), false); + locPropsReqWorkset.parameterizeChannel(toNoOp); + + UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties", + FieldList.EMPTY_LIST); + + rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getParallelism()); + + SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode( + rebuildWorksetPropertiesNode, "Rebuild Workset Properties", + toNoOp, DriverStrategy.UNARY_NO_OP); + rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), + toNoOp.getLocalProperties()); + estimator.costOperator(rebuildWorksetPropertiesPlanNode); + + GlobalProperties atEndGlobalModified = rebuildWorksetPropertiesPlanNode.getGlobalProperties(); + LocalProperties atEndLocalModified = rebuildWorksetPropertiesPlanNode.getLocalProperties(); + + if (!(atEndGlobalModified.equals(atEndGlobal) && atEndLocalModified.equals(atEndLocal))) { + FeedbackPropertiesMeetRequirementsReport report2 = candidate.checkPartialSolutionPropertiesMet( + wspn, atEndGlobalModified, atEndLocalModified); + if (report2 != FeedbackPropertiesMeetRequirementsReport.NOT_MET) { + newCandidates.add(rebuildWorksetPropertiesPlanNode); + } + } + + // remove the original operator and add the modified candidate + planDeleter.remove(); + + } + } + + worksetCandidates.addAll(newCandidates); + } + + if (worksetCandidates.isEmpty()) { + return; + } + + // sanity check the solution set delta + for (PlanNode solutionSetDeltaCandidate : solutionSetDeltaCandidates) { + SingleInputPlanNode candidate = (SingleInputPlanNode) solutionSetDeltaCandidate; + GlobalProperties gp = candidate.getGlobalProperties(); + + if (gp.getPartitioning() != PartitioningProperty.HASH_PARTITIONED || gp.getPartitioningFields() == null || + !gp.getPartitioningFields().equals(this.solutionSetKeyFields)) { + throw new CompilerException("Bug: The solution set delta is not partitioned."); + } + } + + // 5) Create a candidate for the Iteration Node for every remaining plan of the step function. + + final GlobalProperties gp = new GlobalProperties(); + gp.setHashPartitioned(this.solutionSetKeyFields); + gp.addUniqueFieldCombination(this.solutionSetKeyFields); + + LocalProperties lp = LocalProperties.EMPTY.addUniqueFields(this.solutionSetKeyFields); + + // take all combinations of solution set delta and workset plans + for (PlanNode solutionSetCandidate : solutionSetDeltaCandidates) { + for (PlanNode worksetCandidate : worksetCandidates) { + // check whether they have the same operator at their latest branching point + if (this.singleRoot.areBranchCompatible(solutionSetCandidate, worksetCandidate)) { + + SingleInputPlanNode siSolutionDeltaCandidate = (SingleInputPlanNode) solutionSetCandidate; + boolean immediateDeltaUpdate; + + // check whether we need a dedicated solution set delta operator, or whether we can update on the fly + if (siSolutionDeltaCandidate.getInput().getShipStrategy() == ShipStrategyType.FORWARD && + this.solutionDeltaImmediatelyAfterSolutionJoin) + { + // we do not need this extra node. we can make the predecessor the delta + // sanity check the node and connection + if (siSolutionDeltaCandidate.getDriverStrategy() != DriverStrategy.UNARY_NO_OP || + siSolutionDeltaCandidate.getInput().getLocalStrategy() != LocalStrategy.NONE) + { + throw new CompilerException("Invalid Solution set delta node."); + } + + solutionSetCandidate = siSolutionDeltaCandidate.getInput().getSource(); + immediateDeltaUpdate = true; + } else { + // was not partitioned, we need to keep this node. + // mark that we materialize the input + siSolutionDeltaCandidate.getInput().setTempMode(TempMode.PIPELINE_BREAKER); + immediateDeltaUpdate = false; + } + + WorksetIterationPlanNode wsNode = new WorksetIterationPlanNode(this, + "WorksetIteration ("+this.getOperator().getName()+")", solutionSetIn, + worksetIn, sspn, wspn, worksetCandidate, solutionSetCandidate); + wsNode.setImmediateSolutionSetUpdate(immediateDeltaUpdate); + wsNode.initProperties(gp, lp); + target.add(wsNode); + } + } + } + } + + @Override + public void computeUnclosedBranchStack() { + if (this.openBranches != null) { + return; + } + + // IMPORTANT: First compute closed branches from the two inputs + // we need to do this because the runtime iteration head effectively joins + addClosedBranches(getFirstPredecessorNode().closedBranchingNodes); + addClosedBranches(getSecondPredecessorNode().closedBranchingNodes); + + List<UnclosedBranchDescriptor> result1 = getFirstPredecessorNode().getBranchesForParent(getFirstIncomingConnection()); + List<UnclosedBranchDescriptor> result2 = getSecondPredecessorNode().getBranchesForParent(getSecondIncomingConnection()); + + ArrayList<UnclosedBranchDescriptor> inputsMerged1 = new ArrayList<UnclosedBranchDescriptor>(); + mergeLists(result1, result2, inputsMerged1, true); // this method also sets which branches are joined here (in the head) + + addClosedBranches(getSingleRootOfStepFunction().closedBranchingNodes); + + ArrayList<UnclosedBranchDescriptor> inputsMerged2 = new ArrayList<UnclosedBranchDescriptor>(); + List<UnclosedBranchDescriptor> result3 = getSingleRootOfStepFunction().openBranches; + mergeLists(inputsMerged1, result3, inputsMerged2, true); + + // handle the data flow branching for the broadcast inputs + List<UnclosedBranchDescriptor> result = computeUnclosedBranchStackForBroadcastInputs(inputsMerged2); + + this.openBranches = (result == null || result.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : result; + } + + // -------------------------------------------------------------------------------------------- + // Iteration Specific Traversals + // -------------------------------------------------------------------------------------------- + + public void acceptForStepFunction(Visitor<OptimizerNode> visitor) { + this.singleRoot.accept(visitor); + } + + // -------------------------------------------------------------------------------------------- + // Utility Classes + // -------------------------------------------------------------------------------------------- + + private static final class WorksetOpDescriptor extends OperatorDescriptorDual { + + private WorksetOpDescriptor(FieldList solutionSetKeys) { + super(solutionSetKeys, null); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.NONE; + } + + @Override + protected List<GlobalPropertiesPair> createPossibleGlobalProperties() { + RequestedGlobalProperties partitionedGp = new RequestedGlobalProperties(); + partitionedGp.setHashPartitioned(this.keys1); + return Collections.singletonList(new GlobalPropertiesPair(partitionedGp, new RequestedGlobalProperties())); + } + + @Override + protected List<LocalPropertiesPair> createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new LocalPropertiesPair( + new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, + GlobalProperties produced1, GlobalProperties produced2) { + return true; + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + throw new UnsupportedOperationException(); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { + throw new UnsupportedOperationException(); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + throw new UnsupportedOperationException(); + } + } + + public static class SingleRootJoiner extends TwoInputNode { + + SingleRootJoiner() { + super(new NoOpBinaryUdfOp<Nothing>(new NothingTypeInfo())); + + setDegreeOfParallelism(1); + } + + public void setInputs(DagConnection input1, DagConnection input2) { + this.input1 = input1; + this.input2 = input2; + } + + @Override + public String getName() { + return "Internal Utility Node"; + } + + @Override + protected List<OperatorDescriptorDual> getPossibleProperties() { + return Collections.emptyList(); + } + + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + // no estimates are needed here + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java new file mode 100644 index 0000000..3b05aba --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetNode.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dag; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.optimizer.plan.WorksetPlanNode; + +/** + * The optimizer's internal representation of the partial solution that is input to a bulk iteration. + */ +public class WorksetNode extends AbstractPartialSolutionNode { + + private final WorksetIterationNode iterationNode; + + + public WorksetNode(WorksetPlaceHolder<?> psph, WorksetIterationNode iterationNode) { + super(psph); + this.iterationNode = iterationNode; + } + + // -------------------------------------------------------------------------------------------- + + public void setCandidateProperties(GlobalProperties gProps, LocalProperties lProps, Channel initialInput) { + if (this.cachedPlans != null) { + throw new IllegalStateException(); + } else { + WorksetPlanNode wspn = new WorksetPlanNode(this, "Workset ("+this.getOperator().getName()+")", gProps, lProps, initialInput); + this.cachedPlans = Collections.<PlanNode>singletonList(wspn); + } + } + + public WorksetPlanNode getCurrentWorksetPlanNode() { + if (this.cachedPlans != null) { + return (WorksetPlanNode) this.cachedPlans.get(0); + } else { + throw new IllegalStateException(); + } + } + + public WorksetIterationNode getIterationNode() { + return this.iterationNode; + } + + @Override + public void computeOutputEstimates(DataStatistics statistics) { + copyEstimates(this.iterationNode.getInitialWorksetPredecessorNode()); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Gets the contract object for this data source node. + * + * @return The contract. + */ + @Override + public WorksetPlaceHolder<?> getOperator() { + return (WorksetPlaceHolder<?>) super.getOperator(); + } + + @Override + public String getName() { + return "Workset"; + } + + @Override + public void computeUnclosedBranchStack() { + if (this.openBranches != null) { + return; + } + + DagConnection worksetInput = this.iterationNode.getSecondIncomingConnection(); + OptimizerNode worksetSource = worksetInput.getSource(); + + addClosedBranches(worksetSource.closedBranchingNodes); + List<UnclosedBranchDescriptor> fromInput = worksetSource.getBranchesForParent(worksetInput); + this.openBranches = (fromInput == null || fromInput.isEmpty()) ? Collections.<UnclosedBranchDescriptor>emptyList() : fromInput; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java new file mode 100644 index 0000000..57ba29d --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.util.Utils; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class represents global properties of the data at a certain point in the plan. + * Global properties are properties that describe data across different partitions, such as + * whether the data is hash partitioned, range partitioned, replicated, etc. + */ +public class GlobalProperties implements Cloneable { + + public static final Logger LOG = LoggerFactory.getLogger(GlobalProperties.class); + + private PartitioningProperty partitioning; // the type partitioning + + private FieldList partitioningFields; // the fields which are partitioned + + private Ordering ordering; // order of the partitioned fields, if it is an ordered (range) range partitioning + + private Set<FieldSet> uniqueFieldCombinations; + + private Partitioner<?> customPartitioner; + + // -------------------------------------------------------------------------------------------- + + /** + * Initializes the global properties with no partitioning. + */ + public GlobalProperties() { + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; + } + + // -------------------------------------------------------------------------------------------- + + /** + * Sets this global properties to represent a hash partitioning. + * + * @param partitionedFields The key fields on which the data is hash partitioned. + */ + public void setHashPartitioned(FieldList partitionedFields) { + if (partitionedFields == null) { + throw new NullPointerException(); + } + + this.partitioning = PartitioningProperty.HASH_PARTITIONED; + this.partitioningFields = partitionedFields; + this.ordering = null; + } + + + public void setRangePartitioned(Ordering ordering) { + if (ordering == null) { + throw new NullPointerException(); + } + + this.partitioning = PartitioningProperty.RANGE_PARTITIONED; + this.ordering = ordering; + this.partitioningFields = ordering.getInvolvedIndexes(); + } + + public void setAnyPartitioning(FieldList partitionedFields) { + if (partitionedFields == null) { + throw new NullPointerException(); + } + + this.partitioning = PartitioningProperty.ANY_PARTITIONING; + this.partitioningFields = partitionedFields; + this.ordering = null; + } + + public void setRandomPartitioned() { + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; + this.partitioningFields = null; + this.ordering = null; + } + + public void setFullyReplicated() { + this.partitioning = PartitioningProperty.FULL_REPLICATION; + this.partitioningFields = null; + this.ordering = null; + } + + public void setForcedRebalanced() { + this.partitioning = PartitioningProperty.FORCED_REBALANCED; + this.partitioningFields = null; + this.ordering = null; + } + + public void setCustomPartitioned(FieldList partitionedFields, Partitioner<?> partitioner) { + if (partitionedFields == null || partitioner == null) { + throw new NullPointerException(); + } + + this.partitioning = PartitioningProperty.CUSTOM_PARTITIONING; + this.partitioningFields = partitionedFields; + this.ordering = null; + this.customPartitioner = partitioner; + } + + public void addUniqueFieldCombination(FieldSet fields) { + if (fields == null) { + return; + } + if (this.uniqueFieldCombinations == null) { + this.uniqueFieldCombinations = new HashSet<FieldSet>(); + } + this.uniqueFieldCombinations.add(fields); + } + + public void clearUniqueFieldCombinations() { + if (this.uniqueFieldCombinations != null) { + this.uniqueFieldCombinations = null; + } + } + + public Set<FieldSet> getUniqueFieldCombination() { + return this.uniqueFieldCombinations; + } + + public FieldList getPartitioningFields() { + return this.partitioningFields; + } + + public Ordering getPartitioningOrdering() { + return this.ordering; + } + + public PartitioningProperty getPartitioning() { + return this.partitioning; + } + + public Partitioner<?> getCustomPartitioner() { + return this.customPartitioner; + } + + // -------------------------------------------------------------------------------------------- + + public boolean isPartitionedOnFields(FieldSet fields) { + if (this.partitioning.isPartitionedOnKey() && fields.isValidSubset(this.partitioningFields)) { + return true; + } else if (this.uniqueFieldCombinations != null) { + for (FieldSet set : this.uniqueFieldCombinations) { + if (fields.isValidSubset(set)) { + return true; + } + } + return false; + } else { + return false; + } + } + + public boolean isExactlyPartitionedOnFields(FieldList fields) { + return this.partitioning.isPartitionedOnKey() && fields.isExactMatch(this.partitioningFields); + } + + public boolean matchesOrderedPartitioning(Ordering o) { + if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) { + if (this.ordering.getNumberOfFields() > o.getNumberOfFields()) { + return false; + } + + for (int i = 0; i < this.ordering.getNumberOfFields(); i++) { + if (this.ordering.getFieldNumber(i) != o.getFieldNumber(i)) { + return false; + } + + // if this one request no order, everything is good + final Order oo = o.getOrder(i); + final Order to = this.ordering.getOrder(i); + if (oo != Order.NONE) { + if (oo == Order.ANY) { + // if any order is requested, any not NONE order is good + if (to == Order.NONE) { + return false; + } + } else if (oo != to) { + // the orders must be equal + return false; + } + } + } + return true; + } else { + return false; + } + } + + public boolean isFullyReplicated() { + return this.partitioning == PartitioningProperty.FULL_REPLICATION; + } + + /** + * Checks, if the properties in this object are trivial, i.e. only standard values. + */ + public boolean isTrivial() { + return partitioning == PartitioningProperty.RANDOM_PARTITIONED; + } + + /** + * This method resets the properties to a state where no properties are given. + */ + public void reset() { + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; + this.ordering = null; + this.partitioningFields = null; + } + + /** + * Filters these GlobalProperties by the fields that are forwarded to the output + * as described by the SemanticProperties. + * + * @param props The semantic properties holding information about forwarded fields. + * @param input The index of the input. + * @return The filtered GlobalProperties + */ + public GlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { + + if (props == null) { + throw new NullPointerException("SemanticProperties may not be null."); + } + + GlobalProperties gp = new GlobalProperties(); + + // filter partitioning + switch(this.partitioning) { + case RANGE_PARTITIONED: + // check if ordering is preserved + Ordering newOrdering = new Ordering(); + for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) { + int sourceField = this.ordering.getInvolvedIndexes().get(i); + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + + if (targetField == null || targetField.size() == 0) { + // partitioning is destroyed + newOrdering = null; + break; + } else { + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), this.ordering.getOrder(i)); + } + } + if(newOrdering != null) { + gp.partitioning = PartitioningProperty.RANGE_PARTITIONED; + gp.ordering = newOrdering; + gp.partitioningFields = newOrdering.getInvolvedIndexes(); + } + break; + case HASH_PARTITIONED: + case ANY_PARTITIONING: + case CUSTOM_PARTITIONING: + FieldList newPartitioningFields = new FieldList(); + for (int sourceField : this.partitioningFields) { + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + + if (targetField == null || targetField.size() == 0) { + newPartitioningFields = null; + break; + } else { + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newPartitioningFields = newPartitioningFields.addField(targetField.toArray()[0]); + } + } + if(newPartitioningFields != null) { + gp.partitioning = this.partitioning; + gp.partitioningFields = newPartitioningFields; + gp.customPartitioner = this.customPartitioner; + } + break; + case FORCED_REBALANCED: + case FULL_REPLICATION: + case RANDOM_PARTITIONED: + gp.partitioning = this.partitioning; + break; + default: + throw new RuntimeException("Unknown partitioning type."); + } + + // filter unique field combinations + if (this.uniqueFieldCombinations != null) { + Set<FieldSet> newUniqueFieldCombinations = new HashSet<FieldSet>(); + for (FieldSet fieldCombo : this.uniqueFieldCombinations) { + FieldSet newFieldCombo = new FieldSet(); + for (Integer sourceField : fieldCombo) { + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + + if (targetField == null || targetField.size() == 0) { + newFieldCombo = null; + break; + } else { + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newFieldCombo = newFieldCombo.addField(targetField.toArray()[0]); + } + } + if (newFieldCombo != null) { + newUniqueFieldCombinations.add(newFieldCombo); + } + } + if(!newUniqueFieldCombinations.isEmpty()) { + gp.uniqueFieldCombinations = newUniqueFieldCombinations; + } + } + + return gp; + } + + + public void parameterizeChannel(Channel channel, boolean globalDopChange, + ExecutionMode exchangeMode, boolean breakPipeline) { + + ShipStrategyType shipType; + FieldList partitionKeys; + boolean[] sortDirection; + Partitioner<?> partitioner; + + switch (this.partitioning) { + case RANDOM_PARTITIONED: + shipType = globalDopChange ? ShipStrategyType.PARTITION_RANDOM : ShipStrategyType.FORWARD; + partitionKeys = null; + sortDirection = null; + partitioner = null; + break; + + case FULL_REPLICATION: + shipType = ShipStrategyType.BROADCAST; + partitionKeys = null; + sortDirection = null; + partitioner = null; + break; + + case ANY_PARTITIONING: + case HASH_PARTITIONED: + shipType = ShipStrategyType.PARTITION_HASH; + partitionKeys = Utils.createOrderedFromSet(this.partitioningFields); + sortDirection = null; + partitioner = null; + break; + + case RANGE_PARTITIONED: + shipType = ShipStrategyType.PARTITION_RANGE; + partitionKeys = this.ordering.getInvolvedIndexes(); + sortDirection = this.ordering.getFieldSortDirections(); + partitioner = null; + break; + + case FORCED_REBALANCED: + shipType = ShipStrategyType.PARTITION_RANDOM; + partitionKeys = null; + sortDirection = null; + partitioner = null; + break; + + case CUSTOM_PARTITIONING: + shipType = ShipStrategyType.PARTITION_CUSTOM; + partitionKeys = this.partitioningFields; + sortDirection = null; + partitioner = this.customPartitioner; + break; + + default: + throw new CompilerException("Unsupported partitioning strategy"); + } + + DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, shipType, breakPipeline); + channel.setShipStrategy(shipType, partitionKeys, sortDirection, partitioner, exMode); + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((partitioning == null) ? 0 : partitioning.ordinal()); + result = prime * result + ((partitioningFields == null) ? 0 : partitioningFields.hashCode()); + result = prime * result + ((ordering == null) ? 0 : ordering.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof GlobalProperties) { + final GlobalProperties other = (GlobalProperties) obj; + return (this.partitioning == other.partitioning) + && (this.ordering == other.ordering || (this.ordering != null && this.ordering.equals(other.ordering))) + && (this.partitioningFields == other.partitioningFields || + (this.partitioningFields != null && this.partitioningFields.equals(other.partitioningFields))) + && (this.uniqueFieldCombinations == other.uniqueFieldCombinations || + (this.uniqueFieldCombinations != null && this.uniqueFieldCombinations.equals(other.uniqueFieldCombinations))); + } else { + return false; + } + } + + @Override + public String toString() { + final StringBuilder bld = new StringBuilder( + "GlobalProperties [partitioning=" + partitioning + + (this.partitioningFields == null ? "" : ", on fields " + this.partitioningFields) + + (this.ordering == null ? "" : ", with ordering " + this.ordering)); + + if (this.uniqueFieldCombinations == null) { + bld.append(']'); + } else { + bld.append(" - Unique field groups: "); + bld.append(this.uniqueFieldCombinations); + bld.append(']'); + } + return bld.toString(); + } + + @Override + public GlobalProperties clone() { + final GlobalProperties newProps = new GlobalProperties(); + newProps.partitioning = this.partitioning; + newProps.partitioningFields = this.partitioningFields; + newProps.ordering = this.ordering; + newProps.customPartitioner = this.customPartitioner; + newProps.uniqueFieldCombinations = this.uniqueFieldCombinations == null ? null : new HashSet<FieldSet>(this.uniqueFieldCombinations); + return newProps; + } + + // -------------------------------------------------------------------------------------------- + + public static GlobalProperties combine(GlobalProperties gp1, GlobalProperties gp2) { + if (gp1.isFullyReplicated()) { + if (gp2.isFullyReplicated()) { + return new GlobalProperties(); + } else { + return gp2; + } + } else if (gp2.isFullyReplicated()) { + return gp1; + } else if (gp1.ordering != null) { + return gp1; + } else if (gp2.ordering != null) { + return gp2; + } else if (gp1.partitioningFields != null) { + return gp1; + } else if (gp2.partitioningFields != null) { + return gp2; + } else if (gp1.uniqueFieldCombinations != null) { + return gp1; + } else if (gp2.uniqueFieldCombinations != null) { + return gp2; + } else if (gp1.getPartitioning().isPartitioned()) { + return gp1; + } else if (gp2.getPartitioning().isPartitioned()) { + return gp2; + } else { + return gp1; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java new file mode 100644 index 0000000..6946641 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dag.TwoInputNode; + +/** + * Interesting properties are propagated from parent operators to child operators. They tell the child + * what data properties would help the parent in operating in a cheaper fashion. A reduce operator, for + * example, tells its child that partitioned data would help. If the child is a join operator, it can use + * that knowledge to favor strategies that leave the data in a partitioned form. + * + * More on optimization with interesting properties can be found in the works on + * the volcano- and cascades optimizer framework. + */ +public class InterestingProperties implements Cloneable { + + private Set<RequestedGlobalProperties> globalProps; // the global properties, i.e. properties across partitions + + private Set<RequestedLocalProperties> localProps; // the local properties, i.e. properties within partitions + + // ------------------------------------------------------------------------ + + public InterestingProperties() { + this.globalProps = new HashSet<RequestedGlobalProperties>(); + this.localProps = new HashSet<RequestedLocalProperties>(); + } + + /** + * Private constructor for cloning purposes. + * + * @param globalProps The global properties for this new object. + * @param localProps The local properties for this new object. + */ + private InterestingProperties(Set<RequestedGlobalProperties> globalProps, Set<RequestedLocalProperties> localProps) { + this.globalProps = globalProps; + this.localProps = localProps; + } + + // ------------------------------------------------------------------------ + + public void addGlobalProperties(RequestedGlobalProperties props) { + this.globalProps.add(props); + } + + public void addLocalProperties(RequestedLocalProperties props) { + this.localProps.add(props); + } + + public void addInterestingProperties(InterestingProperties other) { + this.globalProps.addAll(other.globalProps); + this.localProps.addAll(other.localProps); + } + + /** + * Gets the interesting local properties. + * + * @return The interesting local properties. + */ + public Set<RequestedLocalProperties> getLocalProperties() { + return this.localProps; + } + + /** + * Gets the interesting global properties. + * + * @return The interesting global properties. + */ + public Set<RequestedGlobalProperties> getGlobalProperties() { + return this.globalProps; + } + + public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) { + InterestingProperties iProps = new InterestingProperties(); + SemanticProperties props; + if (node instanceof SingleInputNode || node instanceof TwoInputNode) { + props = node.getSemanticProperties(); + } else { + props = new SemanticProperties.EmptySemanticProperties(); + } + + for (RequestedGlobalProperties rgp : this.globalProps) { + RequestedGlobalProperties filtered = rgp.filterBySemanticProperties(props, input); + if (filtered != null && !filtered.isTrivial()) { + iProps.addGlobalProperties(filtered); + } + } + for (RequestedLocalProperties rlp : this.localProps) { + RequestedLocalProperties filtered = rlp.filterBySemanticProperties(props, input); + if (filtered != null && !filtered.isTrivial()) { + iProps.addLocalProperties(filtered); + } + } + return iProps; + } + + public void dropTrivials() { + for (Iterator<RequestedGlobalProperties> iter = this.globalProps.iterator(); iter.hasNext();) { + RequestedGlobalProperties gp = iter.next(); + if (gp.isTrivial()) { + iter.remove(); + break; + } + } + + for (Iterator<RequestedLocalProperties> iter = this.localProps.iterator(); iter.hasNext();) { + RequestedLocalProperties lp = iter.next(); + if (lp.isTrivial()) { + iter.remove(); + break; + } + } + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((globalProps == null) ? 0 : globalProps.hashCode()); + result = prime * result + ((localProps == null) ? 0 : localProps.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof InterestingProperties) { + InterestingProperties other = (InterestingProperties) obj; + return this.globalProps.equals(other.globalProps) && + this.localProps.equals(other.localProps); + } else { + return false; + } + } + + @Override + public String toString() { + return "InterestingProperties [globalProps=" + this.globalProps + + ", localProps=" + this.localProps + " ]"; + } + + @Override + public InterestingProperties clone() { + HashSet<RequestedGlobalProperties> globalProps = new HashSet<RequestedGlobalProperties>(); + for (RequestedGlobalProperties p : this.globalProps) { + globalProps.add(p.clone()); + } + HashSet<RequestedLocalProperties> localProps = new HashSet<RequestedLocalProperties>(); + for (RequestedLocalProperties p : this.localProps) { + localProps.add(p.clone()); + } + + return new InterestingProperties(globalProps, localProps); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java new file mode 100644 index 0000000..e0231aa --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.dataproperties; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class represents local properties of the data. A local property is a property that exists + * within the data of a single partition, such as sort order, or data grouping. + */ +public class LocalProperties implements Cloneable { + + public static final Logger LOG = LoggerFactory.getLogger(GlobalProperties.class); + + public static final LocalProperties EMPTY = new LocalProperties(); + + // -------------------------------------------------------------------------------------------- + + private Ordering ordering; // order inside a partition, null if not ordered + + private FieldList groupedFields; // fields by which the stream is grouped. null if not grouped. + + private Set<FieldSet> uniqueFields; // fields whose value combination is unique in the stream + + // -------------------------------------------------------------------------------------------- + + /** + * Default constructor for trivial local properties. No order, no grouping, no uniqueness. + */ + public LocalProperties() {} + + // -------------------------------------------------------------------------------------------- + + /** + * Gets the key order. + * + * @return The key order, or <code>null</code> if nothing is ordered. + */ + public Ordering getOrdering() { + return ordering; + } + + /** + * Gets the grouped fields. + * + * @return The grouped fields, or <code>null</code> if nothing is grouped. + */ + public FieldList getGroupedFields() { + return this.groupedFields; + } + + /** + * Gets the fields whose combination is unique within the data set. + * + * @return The unique field combination, or <code>null</code> if nothing is unique. + */ + public Set<FieldSet> getUniqueFields() { + return this.uniqueFields; + } + + /** + * Checks whether the given set of fields is unique, as specified in these local properties. + * + * @param set The set to check. + * @return True, if the given column combination is unique, false if not. + */ + public boolean areFieldsUnique(FieldSet set) { + return this.uniqueFields != null && this.uniqueFields.contains(set); + } + + /** + * Adds a combination of fields that are unique in these data properties. + * + * @param uniqueFields The fields that are unique in these data properties. + */ + public LocalProperties addUniqueFields(FieldSet uniqueFields) { + LocalProperties copy = clone(); + + if (copy.uniqueFields == null) { + copy.uniqueFields = new HashSet<FieldSet>(); + } + copy.uniqueFields.add(uniqueFields); + return copy; + } + + public LocalProperties clearUniqueFieldSets() { + if (this.uniqueFields == null || this.uniqueFields.isEmpty()) { + return this; + } else { + LocalProperties copy = new LocalProperties(); + copy.ordering = this.ordering; + copy.groupedFields = this.groupedFields; + return copy; + } + } + + /** + * Checks, if the properties in this object are trivial, i.e. only standard values. + */ + public boolean isTrivial() { + return ordering == null && this.groupedFields == null && this.uniqueFields == null; + } + + // -------------------------------------------------------------------------------------------- + + /** + * Filters these LocalProperties by the fields that are forwarded to the output + * as described by the SemanticProperties. + * + * @param props The semantic properties holding information about forwarded fields. + * @param input The index of the input. + * @return The filtered LocalProperties + */ + public LocalProperties filterBySemanticProperties(SemanticProperties props, int input) { + + if (props == null) { + throw new NullPointerException("SemanticProperties may not be null."); + } + + LocalProperties returnProps = new LocalProperties(); + + // check if sorting is preserved + if (this.ordering != null) { + Ordering newOrdering = new Ordering(); + + for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) { + int sourceField = this.ordering.getInvolvedIndexes().get(i); + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + if (targetField == null || targetField.size() == 0) { + if (i == 0) { + // order fully destroyed + newOrdering = null; + break; + } else { + // order partially preserved + break; + } + } else { + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newOrdering.appendOrdering(targetField.toArray()[0], this.ordering.getType(i), this.ordering.getOrder(i)); + } + } + + returnProps.ordering = newOrdering; + if (newOrdering != null) { + returnProps.groupedFields = newOrdering.getInvolvedIndexes(); + } else { + returnProps.groupedFields = null; + } + } + // check if grouping is preserved + else if (this.groupedFields != null) { + FieldList newGroupedFields = new FieldList(); + + for (Integer sourceField : this.groupedFields) { + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + if (targetField == null || targetField.size() == 0) { + newGroupedFields = null; + break; + } else { + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newGroupedFields = newGroupedFields.addField(targetField.toArray()[0]); + } + } + returnProps.groupedFields = newGroupedFields; + } + + if (this.uniqueFields != null) { + Set<FieldSet> newUniqueFields = new HashSet<FieldSet>(); + for (FieldSet fields : this.uniqueFields) { + FieldSet newFields = new FieldSet(); + for (Integer sourceField : fields) { + FieldSet targetField = props.getForwardingTargetFields(input, sourceField); + + if (targetField == null || targetField.size() == 0) { + newFields = null; + break; + } else { + // use any field of target fields for now. We should use something like field equivalence sets in the future. + if(targetField.size() > 1) { + LOG.warn("Found that a field is forwarded to more than one target field in " + + "semantic forwarded field information. Will only use the field with the lowest index."); + } + newFields = newFields.addField(targetField.toArray()[0]); + } + } + if (newFields != null) { + newUniqueFields.add(newFields); + } + } + + if (!newUniqueFields.isEmpty()) { + returnProps.uniqueFields = newUniqueFields; + } else { + returnProps.uniqueFields = null; + } + } + + return returnProps; + } + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (this.ordering == null ? 0 : this.ordering.hashCode()); + result = prime * result + (this.groupedFields == null ? 0 : this.groupedFields.hashCode()); + result = prime * result + (this.uniqueFields == null ? 0 : this.uniqueFields.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof LocalProperties) { + final LocalProperties other = (LocalProperties) obj; + return (ordering == other.getOrdering() || (ordering != null && ordering.equals(other.getOrdering()))) && + (groupedFields == other.getGroupedFields() || (groupedFields != null && groupedFields.equals(other.getGroupedFields()))) && + (uniqueFields == other.getUniqueFields() || (uniqueFields != null && uniqueFields.equals(other.getUniqueFields()))); + } else { + return false; + } + } + + @Override + public String toString() { + return "LocalProperties [ordering=" + this.ordering + ", grouped=" + this.groupedFields + + ", unique=" + this.uniqueFields + "]"; + } + + @Override + public LocalProperties clone() { + LocalProperties copy = new LocalProperties(); + copy.ordering = this.ordering; + copy.groupedFields = this.groupedFields; + copy.uniqueFields = (this.uniqueFields == null ? null : new HashSet<FieldSet>(this.uniqueFields)); + return copy; + } + + // -------------------------------------------------------------------------------------------- + + public static LocalProperties combine(LocalProperties lp1, LocalProperties lp2) { + if (lp1.ordering != null) { + return lp1; + } else if (lp2.ordering != null) { + return lp2; + } else if (lp1.groupedFields != null) { + return lp1; + } else if (lp2.groupedFields != null) { + return lp2; + } else if (lp1.uniqueFields != null && !lp1.uniqueFields.isEmpty()) { + return lp1; + } else if (lp2.uniqueFields != null && !lp2.uniqueFields.isEmpty()) { + return lp2; + } else { + return lp1; + } + } + + // -------------------------------------------------------------------------------------------- + + public static LocalProperties forOrdering(Ordering o) { + LocalProperties props = new LocalProperties(); + props.ordering = o; + props.groupedFields = o.getInvolvedIndexes(); + return props; + } + + public static LocalProperties forGrouping(FieldList groupedFields) { + LocalProperties props = new LocalProperties(); + props.groupedFields = groupedFields; + return props; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java new file mode 100644 index 0000000..5e06dd3 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +/** + * An enumeration of the the different types of distributing data across partitions or + * parallel workers. + */ +public enum PartitioningProperty { + + /** + * Any possible way of data distribution, including random partitioning and full replication. + */ + ANY_DISTRIBUTION, + + /** + * A random disjunct (non-replicated) data distribution, where each datum is contained in one partition only. + * This is for example the result of parallel scans of data in a file system like HDFS, + * or the result of a round-robin data distribution. + */ + RANDOM_PARTITIONED, + + /** + * A hash partitioning on a certain key. + */ + HASH_PARTITIONED, + + /** + * A range partitioning on a certain key. + */ + RANGE_PARTITIONED, + + /** + * A not further specified partitioning on a key (hash-, or range partitioning, or some other scheme even). + */ + ANY_PARTITIONING, + + /** + *Full replication of the data to each parallel instance. + */ + FULL_REPLICATION, + + /** + * A forced even re-balancing. All partitions are guaranteed to have almost the same number of records. + */ + FORCED_REBALANCED, + + /** + * A custom partitioning, accompanied by a {@link org.apache.flink.api.common.functions.Partitioner}. + */ + CUSTOM_PARTITIONING; + + /** + * Checks, if this property represents in fact a partitioning. That is, + * whether this property is not equal to <tt>PartitionProperty.FULL_REPLICATION</tt>. + * + * @return True, if this enum constant is unequal to <tt>PartitionProperty.FULL_REPLICATION</tt>, + * false otherwise. + */ + public boolean isPartitioned() { + return this != FULL_REPLICATION && this != FORCED_REBALANCED && this != ANY_DISTRIBUTION; + } + + /** + * Checks, if this property represents a full replication. + * + * @return True, if this enum constant is equal to <tt>PartitionProperty.FULL_REPLICATION</tt>, + * false otherwise. + */ + public boolean isReplication() { + return this == FULL_REPLICATION; + } + + /** + * Checks if this property presents a partitioning that is not random, but on a partitioning key. + * + * @return True, if the data is partitioned on a key. + */ + public boolean isPartitionedOnKey() { + return isPartitioned() && this != RANDOM_PARTITIONED; + } + + /** + * Checks, if this property represents a partitioning that is computable. + * A computable partitioning can be recreated through an algorithm. If two sets of data are to + * be co-partitioned, it is crucial, that the partitioning schemes are computable. + * <p> + * Examples for computable partitioning schemes are hash- or range-partitioning. An example for a non-computable + * partitioning is the implicit partitioning that exists though a globally unique key. + * + * @return True, if this enum constant is a re-computable partitioning. + */ + public boolean isComputablyPartitioned() { + return this == HASH_PARTITIONED || this == RANGE_PARTITIONED || this == CUSTOM_PARTITIONING; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java new file mode 100644 index 0000000..8c3f6bd --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedGlobalProperties.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.dataproperties; + +import org.apache.flink.api.common.ExecutionMode; +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.SemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.util.Utils; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; + +/** + * This class represents the global properties of the data that are requested by an operator. + * Operators request the global properties they need for correct execution. This list is an example of global + * properties requested by certain operators: + * <ul> + * <li>"groupBy/reduce" will request the data to be partitioned in some way after the key fields.</li> + * <li>"map" will request the data to be in an arbitrary distribution - it has no prerequisites</li> + * <li>"join" will request certain properties for each input. This class represents the properties + * on an input alone. The properties may be partitioning on the key fields, or a combination of + * replication on one input and anything-but-replication on the other input.</li> + * </ul> + */ +public final class RequestedGlobalProperties implements Cloneable { + + private PartitioningProperty partitioning; // the type partitioning + + private FieldSet partitioningFields; // the fields which are partitioned + + private Ordering ordering; // order of the partitioned fields, if it is an ordered (range) range partitioning + + private DataDistribution dataDistribution; // optional data distribution, for a range partitioning + + private Partitioner<?> customPartitioner; // optional, partitioner for custom partitioning + + // -------------------------------------------------------------------------------------------- + + /** + * Initializes the global properties with no partitioning. + */ + public RequestedGlobalProperties() { + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; + } + + // -------------------------------------------------------------------------------------------- + + /** + * Sets these properties to request a hash partitioning on the given fields. + * + * If the fields are provided as {@link FieldSet}, then any permutation of the fields is a + * valid partitioning, including subsets. If the fields are given as a {@link FieldList}, + * then only an exact partitioning on the fields matches this requested partitioning. + * + * @param partitionedFields The key fields for the partitioning. + */ + public void setHashPartitioned(FieldSet partitionedFields) { + if (partitionedFields == null) { + throw new NullPointerException(); + } + this.partitioning = PartitioningProperty.HASH_PARTITIONED; + this.partitioningFields = partitionedFields; + this.ordering = null; + } + + + public void setRangePartitioned(Ordering ordering) { + this.setRangePartitioned(ordering, null); + } + + public void setRangePartitioned(Ordering ordering, DataDistribution dataDistribution) { + if (ordering == null) { + throw new NullPointerException(); + } + this.partitioning = PartitioningProperty.RANGE_PARTITIONED; + this.ordering = ordering; + this.partitioningFields = null; + this.dataDistribution = dataDistribution; + } + + /** + * Sets these properties to request some partitioning on the given fields. This will allow + * both hash partitioning and range partitioning to match. + * + * If the fields are provided as {@link FieldSet}, then any permutation of the fields is a + * valid partitioning, including subsets. If the fields are given as a {@link FieldList}, + * then only an exact partitioning on the fields matches this requested partitioning. + * + * @param partitionedFields The key fields for the partitioning. + */ + public void setAnyPartitioning(FieldSet partitionedFields) { + if (partitionedFields == null) { + throw new NullPointerException(); + } + this.partitioning = PartitioningProperty.ANY_PARTITIONING; + this.partitioningFields = partitionedFields; + this.ordering = null; + } + + public void setRandomPartitioning() { + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; + this.partitioningFields = null; + this.ordering = null; + } + + public void setAnyDistribution() { + this.partitioning = PartitioningProperty.ANY_DISTRIBUTION; + this.partitioningFields = null; + this.ordering = null; + } + + public void setFullyReplicated() { + this.partitioning = PartitioningProperty.FULL_REPLICATION; + this.partitioningFields = null; + this.ordering = null; + } + + public void setForceRebalancing() { + this.partitioning = PartitioningProperty.FORCED_REBALANCED; + this.partitioningFields = null; + this.ordering = null; + } + + /** + * Sets these properties to request a custom partitioning with the given {@link Partitioner} instance. + * + * If the fields are provided as {@link FieldSet}, then any permutation of the fields is a + * valid partitioning, including subsets. If the fields are given as a {@link FieldList}, + * then only an exact partitioning on the fields matches this requested partitioning. + * + * @param partitionedFields The key fields for the partitioning. + */ + public void setCustomPartitioned(FieldSet partitionedFields, Partitioner<?> partitioner) { + if (partitionedFields == null || partitioner == null) { + throw new NullPointerException(); + } + + this.partitioning = PartitioningProperty.CUSTOM_PARTITIONING; + this.partitioningFields = partitionedFields; + this.ordering = null; + this.customPartitioner = partitioner; + } + + /** + * Gets the partitioning property. + * + * @return The partitioning property. + */ + public PartitioningProperty getPartitioning() { + return partitioning; + } + + /** + * Gets the fields on which the data is partitioned. + * + * @return The partitioning fields. + */ + public FieldSet getPartitionedFields() { + return this.partitioningFields; + } + + /** + * Gets the key order. + * + * @return The key order. + */ + public Ordering getOrdering() { + return this.ordering; + } + + /** + * Gets the data distribution. + * + * @return The data distribution. + */ + public DataDistribution getDataDistribution() { + return this.dataDistribution; + } + + /** + * Gets the custom partitioner associated with these properties. + * + * @return The custom partitioner associated with these properties. + */ + public Partitioner<?> getCustomPartitioner() { + return customPartitioner; + } + + /** + * Checks, if the properties in this object are trivial, i.e. only standard values. + */ + public boolean isTrivial() { + return this.partitioning == null || this.partitioning == PartitioningProperty.RANDOM_PARTITIONED; + } + + /** + * This method resets the properties to a state where no properties are given. + */ + public void reset() { + this.partitioning = PartitioningProperty.RANDOM_PARTITIONED; + this.ordering = null; + this.partitioningFields = null; + this.dataDistribution = null; + this.customPartitioner = null; + } + + /** + * Filters these properties by what can be preserved by the given SemanticProperties when propagated down + * to the given input. + * + * @param props The SemanticProperties which define which fields are preserved. + * @param input The index of the operator's input. + * @return The filtered RequestedGlobalProperties + */ + public RequestedGlobalProperties filterBySemanticProperties(SemanticProperties props, int input) { + // no semantic properties available. All global properties are filtered. + if (props == null) { + throw new NullPointerException("SemanticProperties may not be null."); + } + + RequestedGlobalProperties rgProp = new RequestedGlobalProperties(); + + switch(this.partitioning) { + case FULL_REPLICATION: + case FORCED_REBALANCED: + case CUSTOM_PARTITIONING: + case RANDOM_PARTITIONED: + case ANY_DISTRIBUTION: + // make sure that certain properties are not pushed down + return null; + case HASH_PARTITIONED: + case ANY_PARTITIONING: + FieldSet newFields; + if(this.partitioningFields instanceof FieldList) { + newFields = new FieldList(); + } else { + newFields = new FieldSet(); + } + + for (Integer targetField : this.partitioningFields) { + int sourceField = props.getForwardingSourceField(input, targetField); + if (sourceField >= 0) { + newFields = newFields.addField(sourceField); + } else { + // partial partitionings are not preserved to avoid skewed partitioning + return null; + } + } + rgProp.partitioning = this.partitioning; + rgProp.partitioningFields = newFields; + return rgProp; + case RANGE_PARTITIONED: + // range partitioning + Ordering newOrdering = new Ordering(); + for (int i = 0; i < this.ordering.getInvolvedIndexes().size(); i++) { + int value = this.ordering.getInvolvedIndexes().get(i); + int sourceField = props.getForwardingSourceField(input, value); + if (sourceField >= 0) { + newOrdering.appendOrdering(sourceField, this.ordering.getType(i), this.ordering.getOrder(i)); + } else { + return null; + } + } + rgProp.partitioning = this.partitioning; + rgProp.ordering = newOrdering; + rgProp.dataDistribution = this.dataDistribution; + return rgProp; + default: + throw new RuntimeException("Unknown partitioning type encountered."); + } + } + + /** + * Checks, if this set of interesting properties, is met by the given + * produced properties. + * + * @param props The properties for which to check whether they meet these properties. + * @return True, if the properties are met, false otherwise. + */ + public boolean isMetBy(GlobalProperties props) { + if (this.partitioning == PartitioningProperty.ANY_DISTRIBUTION) { + return true; + } else if (this.partitioning == PartitioningProperty.FULL_REPLICATION) { + return props.isFullyReplicated(); + } + else if (props.isFullyReplicated()) { + return false; + } + else if (this.partitioning == PartitioningProperty.RANDOM_PARTITIONED) { + return true; + } + else if (this.partitioning == PartitioningProperty.ANY_PARTITIONING) { + return checkCompatiblePartitioningFields(props); + } + else if (this.partitioning == PartitioningProperty.HASH_PARTITIONED) { + return props.getPartitioning() == PartitioningProperty.HASH_PARTITIONED && + checkCompatiblePartitioningFields(props); + } + else if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) { + return props.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED && + props.matchesOrderedPartitioning(this.ordering); + } + else if (this.partitioning == PartitioningProperty.FORCED_REBALANCED) { + return props.getPartitioning() == PartitioningProperty.FORCED_REBALANCED; + } + else if (this.partitioning == PartitioningProperty.CUSTOM_PARTITIONING) { + return props.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING && + checkCompatiblePartitioningFields(props) && + props.getCustomPartitioner().equals(this.customPartitioner); + + } + else { + throw new CompilerException("Properties matching logic leaves open cases."); + } + } + + /** + * Parametrizes the ship strategy fields of a channel such that the channel produces + * the desired global properties. + * + * @param channel The channel to parametrize. + * @param globalDopChange Flag indicating whether the degree of parallelism changes + * between sender and receiver. + * @param exchangeMode The mode of data exchange (pipelined, always batch, + * batch only on shuffle, ...) + * @param breakPipeline Indicates whether this data exchange should break + * pipelines (unless pipelines are forced). + */ + public void parameterizeChannel(Channel channel, boolean globalDopChange, + ExecutionMode exchangeMode, boolean breakPipeline) { + + // safety check. Fully replicated input must be preserved. + if (channel.getSource().getGlobalProperties().isFullyReplicated() && + !(this.partitioning == PartitioningProperty.FULL_REPLICATION || + this.partitioning == PartitioningProperty.ANY_DISTRIBUTION)) + { + throw new CompilerException("Fully replicated input must be preserved " + + "and may not be converted into another global property."); + } + + // if we request nothing, then we need no special strategy. forward, if the number of instances remains + // the same, randomly repartition otherwise + if (isTrivial() || this.partitioning == PartitioningProperty.ANY_DISTRIBUTION) { + ShipStrategyType shipStrategy = globalDopChange ? ShipStrategyType.PARTITION_RANDOM : + ShipStrategyType.FORWARD; + + DataExchangeMode em = DataExchangeMode.select(exchangeMode, shipStrategy, breakPipeline); + channel.setShipStrategy(shipStrategy, em); + return; + } + + final GlobalProperties inGlobals = channel.getSource().getGlobalProperties(); + // if we have no global parallelism change, check if we have already compatible global properties + if (!globalDopChange && isMetBy(inGlobals)) { + DataExchangeMode em = DataExchangeMode.select(exchangeMode, ShipStrategyType.FORWARD, breakPipeline); + channel.setShipStrategy(ShipStrategyType.FORWARD, em); + return; + } + + // if we fall through the conditions until here, we need to re-establish + ShipStrategyType shipType; + FieldList partitionKeys; + boolean[] sortDirection; + Partitioner<?> partitioner; + + switch (this.partitioning) { + case FULL_REPLICATION: + shipType = ShipStrategyType.BROADCAST; + partitionKeys = null; + sortDirection = null; + partitioner = null; + break; + + case ANY_PARTITIONING: + case HASH_PARTITIONED: + shipType = ShipStrategyType.PARTITION_HASH; + partitionKeys = Utils.createOrderedFromSet(this.partitioningFields); + sortDirection = null; + partitioner = null; + break; + + case RANGE_PARTITIONED: + shipType = ShipStrategyType.PARTITION_RANGE; + partitionKeys = this.ordering.getInvolvedIndexes(); + sortDirection = this.ordering.getFieldSortDirections(); + partitioner = null; + + if (this.dataDistribution != null) { + channel.setDataDistribution(this.dataDistribution); + } + break; + + case FORCED_REBALANCED: + shipType = ShipStrategyType.PARTITION_FORCED_REBALANCE; + partitionKeys = null; + sortDirection = null; + partitioner = null; + break; + + case CUSTOM_PARTITIONING: + shipType = ShipStrategyType.PARTITION_CUSTOM; + partitionKeys = Utils.createOrderedFromSet(this.partitioningFields); + sortDirection = null; + partitioner = this.customPartitioner; + break; + + default: + throw new CompilerException("Invalid partitioning to create through a data exchange: " + + this.partitioning.name()); + } + + DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, shipType, breakPipeline); + channel.setShipStrategy(shipType, partitionKeys, sortDirection, partitioner, exMode); + } + + // ------------------------------------------------------------------------ + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((partitioning == null) ? 0 : partitioning.ordinal()); + result = prime * result + ((partitioningFields == null) ? 0 : partitioningFields.hashCode()); + result = prime * result + ((ordering == null) ? 0 : ordering.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof RequestedGlobalProperties) { + RequestedGlobalProperties other = (RequestedGlobalProperties) obj; + return (ordering == other.getOrdering() || (ordering != null && ordering.equals(other.getOrdering()))) + && (partitioning == other.getPartitioning()) + && (partitioningFields == other.partitioningFields || + (partitioningFields != null && partitioningFields.equals(other.getPartitionedFields()))); + } else { + return false; + } + } + + @Override + public String toString() { + return "Requested Global Properties [partitioning=" + partitioning + + (this.partitioningFields == null ? "" : ", on fields " + this.partitioningFields) + + (this.ordering == null ? "" : ", with ordering " + this.ordering) + "]"; + } + + public RequestedGlobalProperties clone() { + try { + return (RequestedGlobalProperties) super.clone(); + } catch (CloneNotSupportedException cnse) { + // should never happen, but propagate just in case + throw new RuntimeException(cnse); + } + } + + private boolean checkCompatiblePartitioningFields(GlobalProperties props) { + if(this.partitioningFields instanceof FieldList) { + // partitioningFields as FieldList requires strict checking! + return props.isExactlyPartitionedOnFields((FieldList)this.partitioningFields); + } else { + return props.isPartitionedOnFields(this.partitioningFields); + } + } +}