http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java index ced0e83..6946641 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/InterestingProperties.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.optimizer.dataproperties; import java.util.HashSet; @@ -29,13 +28,16 @@ import org.apache.flink.optimizer.dag.SingleInputNode; import org.apache.flink.optimizer.dag.TwoInputNode; /** - * The interesting properties that a node in the optimizer plan hands to its predecessors. It has the - * purpose to tell the preceding nodes, which data properties might have the advantage, because they would - * let the node fulfill its pact cheaper. More on optimization with interesting properties can be found - * in the works on the volcano- and cascades optimizer framework. + * Interesting properties are propagated from parent operators to child operators. They tell the child + * what data properties would help the parent in operating in a cheaper fashion. A reduce operator, for + * example, tells its child that partitioned data would help. If the child is a join operator, it can use + * that knowledge to favor strategies that leave the data in a partitioned form. + * + * More on optimization with interesting properties can be found in the works on + * the volcano- and cascades optimizer framework. */ -public class InterestingProperties implements Cloneable -{ +public class InterestingProperties implements Cloneable { + private Set<RequestedGlobalProperties> globalProps; // the global properties, i.e. properties across partitions private Set<RequestedLocalProperties> localProps; // the local properties, i.e. properties within partitions @@ -91,8 +93,7 @@ public class InterestingProperties implements Cloneable return this.globalProps; } - public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) - { + public InterestingProperties filterByCodeAnnotations(OptimizerNode node, int input) { InterestingProperties iProps = new InterestingProperties(); SemanticProperties props; if (node instanceof SingleInputNode || node instanceof TwoInputNode) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java index 0c3ea12..e0231aa 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/LocalProperties.java @@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory; /** * This class represents local properties of the data. A local property is a property that exists - * within the data of a single partition. + * within the data of a single partition, such as sort order, or data grouping. */ public class LocalProperties implements Cloneable { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java index 674cdb8..5e06dd3 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/PartitioningProperty.java @@ -19,42 +19,45 @@ package org.apache.flink.optimizer.dataproperties; /** - * An enumeration tracking the different types of sharding strategies. + * An enumeration of the the different types of distributing data across partitions or + * parallel workers. */ public enum PartitioningProperty { /** - * Any data distribution, i.e., random partitioning or full replication. + * Any possible way of data distribution, including random partitioning and full replication. */ ANY_DISTRIBUTION, /** - * Constant indicating no particular partitioning (i.e. random) data distribution. + * A random disjunct (non-replicated) data distribution, where each datum is contained in one partition only. + * This is for example the result of parallel scans of data in a file system like HDFS, + * or the result of a round-robin data distribution. */ RANDOM_PARTITIONED, /** - * Constant indicating a hash partitioning. + * A hash partitioning on a certain key. */ HASH_PARTITIONED, /** - * Constant indicating a range partitioning. + * A range partitioning on a certain key. */ RANGE_PARTITIONED, /** - * Constant indicating any not further specified disjunct partitioning. + * A not further specified partitioning on a key (hash-, or range partitioning, or some other scheme even). */ ANY_PARTITIONING, /** - * Constant indicating full replication of the data to each parallel instance. + *Full replication of the data to each parallel instance. */ FULL_REPLICATION, /** - * Constant indicating a forced even re-balancing. + * A forced even re-balancing. All partitions are guaranteed to have almost the same number of records. */ FORCED_REBALANCED, @@ -95,10 +98,10 @@ public enum PartitioningProperty { /** * Checks, if this property represents a partitioning that is computable. - * Computable partitionings can be recreated through an algorithm. If two sets of data are to + * A computable partitioning can be recreated through an algorithm. If two sets of data are to * be co-partitioned, it is crucial, that the partitioning schemes are computable. * <p> - * Examples for computable partitioning schemes are hash- or range-partitionings. An example for a non-computable + * Examples for computable partitioning schemes are hash- or range-partitioning. An example for a non-computable * partitioning is the implicit partitioning that exists though a globally unique key. * * @return True, if this enum constant is a re-computable partitioning. http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java index 102a0f0..a9d14e8 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/dataproperties/RequestedLocalProperties.java @@ -29,14 +29,16 @@ import org.apache.flink.optimizer.util.Utils; import org.apache.flink.runtime.operators.util.LocalStrategy; /** - * This class represents local properties of the data. A local property is a property that exists - * within the data of a single partition. + * This class represents the local properties of the data that are requested by an operator. + * Local properties are the properties within one partition. + * Operators request the local properties they need for correct execution. Here are some example local + * properties requested by certain operators: + * <ul> + * <li>"groupBy/reduce" will request the data to be grouped on the key fields.</li> + * <li>A sort-merge join will request the data from each input to be sorted on the respective join key.</li> + * </ul> */ public class RequestedLocalProperties implements Cloneable { - - public static final RequestedLocalProperties DEFAULT_PROPERTIES = null; - - // -------------------------------------------------------------------------------------------- private Ordering ordering; // order inside a partition, null if not ordered @@ -205,9 +207,9 @@ public class RequestedLocalProperties implements Cloneable { } /** - * Parameterizes the local strategy fields of a channel such that the channel produces the desired local properties. + * Parametrizes the local strategy fields of a channel such that the channel produces the desired local properties. * - * @param channel The channel to parameterize. + * @param channel The channel to parametrize. */ public void parameterizeChannel(Channel channel) { LocalProperties current = channel.getLocalProperties(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java index eaf11ac..9ee56b0 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupCombineProperties.java @@ -40,7 +40,7 @@ public final class AllGroupCombineProperties extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "GroupCombine ("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_COMBINE); + return new SingleInputPlanNode(node, "GroupCombine ("+node.getOperator().getName()+")", in, DriverStrategy.ALL_GROUP_COMBINE); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java index 55d6fbb..9efd8c7 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupReduceProperties.java @@ -40,7 +40,7 @@ public final class AllGroupReduceProperties extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", in, DriverStrategy.ALL_GROUP_REDUCE); + return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", in, DriverStrategy.ALL_GROUP_REDUCE); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java index 7d07e7d..b3c083a 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllGroupWithPartialPreGroupProperties.java @@ -46,7 +46,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // locally connected, directly instantiate - return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", + return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", in, DriverStrategy.ALL_GROUP_REDUCE); } else { // non forward case.plug in a combiner @@ -55,10 +55,10 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip // create an input node for combine with same DOP as input node GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); - combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); + combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, - "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_REDUCE_COMBINE); + "Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_REDUCE_COMBINE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); @@ -67,7 +67,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip in.getShipStrategySortOrder(), in.getDataExchangeMode()); toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", + return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.ALL_GROUP_REDUCE); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java index 4f6a4fd..a172a60 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/AllReduceProperties.java @@ -45,7 +45,7 @@ public final class AllReduceProperties extends OperatorDescriptorSingle { public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { if (in.getShipStrategy() == ShipStrategyType.FORWARD) { // locally connected, directly instantiate - return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")", + return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, DriverStrategy.ALL_REDUCE); } else { // non forward case.plug in a combiner @@ -54,10 +54,10 @@ public final class AllReduceProperties extends OperatorDescriptorSingle { // create an input node for combine with same DOP as input node ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); - combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); + combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, - "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE); + "Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); @@ -67,7 +67,7 @@ public final class AllReduceProperties extends OperatorDescriptorSingle { toReducer.setLocalStrategy(in.getLocalStrategy(), in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")", + return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.ALL_REDUCE); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java index c46ce56..f48e297 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CartesianProductDescriptor.java @@ -93,7 +93,7 @@ public abstract class CartesianProductDescriptor extends OperatorDescriptorDual @Override public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { - return new DualInputPlanNode(node, "Cross("+node.getPactContract().getName()+")", in1, in2, getStrategy()); + return new DualInputPlanNode(node, "Cross("+node.getOperator().getName()+")", in1, in2, getStrategy()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java index a17063e..368944e 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java @@ -215,7 +215,7 @@ public class CoGroupDescriptor extends OperatorDescriptorDual { inputOrders = tmp; } - return new DualInputPlanNode(node, "CoGroup ("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders); + return new DualInputPlanNode(node, "CoGroup ("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java index ffae66f..8e7edeb 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CoGroupWithSolutionSetFirstDescriptor.java @@ -60,7 +60,7 @@ public class CoGroupWithSolutionSetFirstDescriptor extends CoGroupDescriptor { inputOrders = tmp; } - return new DualInputPlanNode(node, "CoGroup ("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders); + return new DualInputPlanNode(node, "CoGroup ("+node.getOperator().getName()+")", in1, in2, DriverStrategy.CO_GROUP, this.keys1, this.keys2, inputOrders); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java index 92f7474..bcd4d73 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/CollectorMapDescriptor.java @@ -42,7 +42,7 @@ public class CollectorMapDescriptor extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "Map ("+node.getPactContract().getName()+")", in, DriverStrategy.COLLECTOR_MAP); + return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.COLLECTOR_MAP); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java index e7ff210..81c823f 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java @@ -41,7 +41,7 @@ public class FilterDescriptor extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "Filter ("+node.getPactContract().getName()+")", in, DriverStrategy.FLAT_MAP); + return new SingleInputPlanNode(node, "Filter ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java index 5384aa6..b915e45 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java @@ -42,7 +42,7 @@ public class FlatMapDescriptor extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "FlatMap ("+node.getPactContract().getName()+")", in, DriverStrategy.FLAT_MAP); + return new SingleInputPlanNode(node, "FlatMap ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java index d0e839a..b648386 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java @@ -71,12 +71,12 @@ public final class GroupCombineProperties extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - node.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); + node.setDegreeOfParallelism(in.getSource().getParallelism()); // sorting key info SingleInputPlanNode singleInputPlanNode = new SingleInputPlanNode( node, - "GroupCombine (" + node.getPactContract().getName() + ")", + "GroupCombine (" + node.getOperator().getName() + ")", in, // reuse the combine strategy also used in the group reduce DriverStrategy.SORTED_GROUP_COMBINE, this.keyList); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java index c66321d..ebd09f2 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java @@ -85,7 +85,7 @@ public final class GroupReduceProperties extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); + return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java index 1caee6c..c4f47d3 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java @@ -98,7 +98,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); } - return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", in, + return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } else { // non forward case. all local properties are killed anyways, so we can safely plug in a combiner @@ -107,9 +107,9 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi // create an input node for combine with same DOP as input node GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); - combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); + combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract() + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); @@ -124,7 +124,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")", + return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java index a6c4500..fec72a9 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java @@ -79,7 +79,7 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor { else { strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST; } - return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2, strategy, this.keys1, this.keys2); + return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java index 79cb3cb..f9d1e6c 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java @@ -78,7 +78,7 @@ public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor else { strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND; } - return new DualInputPlanNode(node, "Join ("+node.getPactContract().getName()+")", in1, in2, strategy, this.keys1, this.keys2); + return new DualInputPlanNode(node, "Join ("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java index e55a728..9f14d2a 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java @@ -41,7 +41,7 @@ public class MapDescriptor extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "Map ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP); + return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.MAP); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java index 8cef12d..1489097 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java @@ -41,7 +41,7 @@ public class MapPartitionDescriptor extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { - return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION); + return new SingleInputPlanNode(node, "MapPartition ("+node.getOperator().getName()+")", in, DriverStrategy.MAP_PARTITION); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java index fe9302a..2bde29b 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java @@ -48,10 +48,10 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { // create in input node for combine with same DOP as input node - GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getPactContract()); - combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); + GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator()); + combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in, + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_COMBINE); // sorting key info combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java index 01096f0..5bb51f3 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java @@ -61,7 +61,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle { if (in.getShipStrategy() == ShipStrategyType.FORWARD || (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) { - return new SingleInputPlanNode(node, "Reduce ("+node.getPactContract().getName()+")", in, + return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_REDUCE, this.keyList); } else { @@ -71,10 +71,10 @@ public final class ReduceProperties extends OperatorDescriptorSingle { // create an input node for combine with same DOP as input node ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); - combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); + combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, - "Combine ("+node.getPactContract().getName()+")", toCombiner, + "Combine ("+node.getOperator().getName()+")", toCombiner, DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList); combiner.setCosts(new Costs(0, 0)); @@ -85,7 +85,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle { in.getShipStrategySortOrder(), in.getDataExchangeMode()); toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", toReducer, + return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer, DriverStrategy.SORTED_REDUCE, this.keyList); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java index afe7e8d..356836a 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java @@ -99,7 +99,7 @@ public class SortMergeJoinDescriptor extends AbstractJoinDescriptor { inputOrders = tmp; } - return new DualInputPlanNode(node, "Join("+node.getPactContract().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders); + return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java index 093e968..bf22fb3 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java @@ -42,7 +42,7 @@ public class BinaryUnionPlanNode extends DualInputPlanNode { this.nodeCosts = toSwapFrom.nodeCosts; this.cumulativeCosts = toSwapFrom.cumulativeCosts; - setDegreeOfParallelism(toSwapFrom.getDegreeOfParallelism()); + setParallelism(toSwapFrom.getParallelism()); } public BinaryUnionNode getOptimizerNode() { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java index 8a2398c..875d1c3 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java @@ -37,7 +37,30 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; /** - * A Channel is a data exchange between two operators. + * A Channel represents the result produced by an operator and the data exchange + * before the consumption by the target operator. + * + * The channel defines and tracks various properties and characteristics of the + * data set and data exchange. + * + * Data set characteristics: + * <ul> + * <li>The "global properties" of the data, i.e., how the data is distributed across + * partitions</li> + * <li>The "required global properties" of the data, i.e., the global properties that, if absent, + * would cause the program to return a wrong result.</li> + * <li>The "local properties" of the data, i.e., how the data is organized within a partition</li> + * <li>The "required local properties" of the data, i.e., the local properties that, if absent, + * would cause the program to return a wrong result.</li> + * </ul> + * + * Data exchange parameters: + * <ul> + * <li>The "ship strategy", i.e., whether to forward the data, shuffle it, broadcast it, ...</li> + * <li>The "ship keys", which are the positions of the key fields in the exchanged records.</li> + * <li>The "data exchange mode", which defines whether to pipeline or batch the exchange</li> + * <li>Several more...</li> + * </ul> */ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> { http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java index eea9b67..01c56dd 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java @@ -81,10 +81,10 @@ public class DualInputPlanNode extends PlanNode { this.sortOrders = driverSortOrders; if (this.input1.getShipStrategy() == ShipStrategyType.BROADCAST) { - this.input1.setReplicationFactor(getDegreeOfParallelism()); + this.input1.setReplicationFactor(getParallelism()); } if (this.input2.getShipStrategy() == ShipStrategyType.BROADCAST) { - this.input2.setReplicationFactor(getDegreeOfParallelism()); + this.input2.setReplicationFactor(getParallelism()); } mergeBranchPlanMaps(input1.getSource(), input2.getSource()); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java index e565909..6f634fb 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.optimizer.plan; import java.util.ArrayList; @@ -73,7 +72,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan private double relativeMemoryPerSubTask; // the amount of memory dedicated to each task, in bytes - private int degreeOfParallelism; + private int parallelism; private boolean pFlag; // flag for the internal pruning algorithm @@ -86,7 +85,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan this.nodeName = nodeName; this.driverStrategy = strategy; - this.degreeOfParallelism = template.getDegreeOfParallelism(); + this.parallelism = template.getParallelism(); // check, if there is branch at this node. if yes, this candidate must be associated with // the branching template node. @@ -134,21 +133,21 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan // -------------------------------------------------------------------------------------------- /** - * Gets the optimizer's pact node for which this plan candidate node was created. + * Gets the node from the optimizer DAG for which this plan candidate node was created. * - * @return The template optimizer's node. + * @return The optimizer's DAG node. */ public OptimizerNode getOriginalOptimizerNode() { return this.template; } /** - * Gets the pact contract this node represents in the plan. + * Gets the program operator that this node represents in the plan. * - * @return The pact contract this node represents in the plan. + * @return The program operator this node represents in the plan. */ - public Operator<?> getPactContract() { - return this.template.getPactContract(); + public Operator<?> getProgramOperator() { + return this.template.getOperator(); } /** @@ -252,7 +251,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan return null; } else { Costs result = cumulativeCosts.clone(); - if (this.template != null && this.template.getOutgoingConnections() != null) { + if (this.template.getOutgoingConnections() != null) { int outDegree = this.template.getOutgoingConnections().size(); if (outDegree > 0) { result.divideBy(outDegree); @@ -302,12 +301,12 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan } } - public void setDegreeOfParallelism(int parallelism) { - this.degreeOfParallelism = parallelism; + public void setParallelism(int parallelism) { + this.parallelism = parallelism; } - public int getDegreeOfParallelism() { - return this.degreeOfParallelism; + public int getParallelism() { + return this.parallelism; } public long getGuaranteedAvailableMemory() { @@ -514,7 +513,7 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan @Override public String toString() { - return this.template.getName() + " \"" + getPactContract().getName() + "\" : " + this.driverStrategy + + return this.template.getName() + " \"" + getProgramOperator().getName() + "\" : " + this.driverStrategy + " [[ " + this.globalProps + " ]] [[ " + this.localProps + " ]]"; } http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java index cefd704..b928be7 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/SingleInputPlanNode.java @@ -81,7 +81,7 @@ public class SingleInputPlanNode extends PlanNode { } if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) { - this.input.setReplicationFactor(getDegreeOfParallelism()); + this.input.setReplicationFactor(getParallelism()); } final PlanNode predNode = input.getSource(); http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java index 4083a2a..6f918c0 100644 --- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java +++ b/flink-compiler/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java @@ -39,7 +39,7 @@ import org.apache.flink.optimizer.dag.BulkIterationNode; import org.apache.flink.optimizer.dag.DataSinkNode; import org.apache.flink.optimizer.dag.DataSourceNode; import org.apache.flink.optimizer.dag.OptimizerNode; -import org.apache.flink.optimizer.dag.PactConnection; +import org.apache.flink.optimizer.dag.DagConnection; import org.apache.flink.optimizer.dag.TempMode; import org.apache.flink.optimizer.dag.WorksetIterationNode; import org.apache.flink.optimizer.dataproperties.GlobalProperties; @@ -231,18 +231,18 @@ public class PlanJSONDumpGenerator { String contents; if (n instanceof DataSinkNode) { type = "sink"; - contents = n.getPactContract().toString(); + contents = n.getOperator().toString(); } else if (n instanceof DataSourceNode) { type = "source"; - contents = n.getPactContract().toString(); + contents = n.getOperator().toString(); } else if (n instanceof BulkIterationNode) { type = "bulk_iteration"; - contents = n.getPactContract().getName(); + contents = n.getOperator().getName(); } else if (n instanceof WorksetIterationNode) { type = "workset_iteration"; - contents = n.getPactContract().getName(); + contents = n.getOperator().getName(); } else if (n instanceof BinaryUnionNode) { type = "pact"; @@ -250,7 +250,7 @@ public class PlanJSONDumpGenerator { } else { type = "pact"; - contents = n.getPactContract().getName(); + contents = n.getOperator().getName(); } contents = StringUtils.showControlCharacters(contents); @@ -277,7 +277,7 @@ public class PlanJSONDumpGenerator { // degree of parallelism writer.print(",\n\t\t\"parallelism\": \"" - + (n.getDegreeOfParallelism() >= 1 ? n.getDegreeOfParallelism() : "default") + "\""); + + (n.getParallelism() >= 1 ? n.getParallelism() : "default") + "\""); // output node predecessors Iterator<? extends DumpableConnection<?>> inConns = node.getDumpableInputs().iterator(); @@ -294,10 +294,10 @@ public class PlanJSONDumpGenerator { writer.print(inputNum == 0 ? "\n" : ",\n"); if (inputNum == 0) { child1name += child1name.length() > 0 ? ", " : ""; - child1name += source.getOptimizerNode().getPactContract().getName(); + child1name += source.getOptimizerNode().getOperator().getName(); } else if (inputNum == 1) { child2name += child2name.length() > 0 ? ", " : ""; - child2name = source.getOptimizerNode().getPactContract().getName(); + child2name = source.getOptimizerNode().getOperator().getName(); } // output predecessor id @@ -310,7 +310,7 @@ public class PlanJSONDumpGenerator { // output shipping strategy and channel type final Channel channel = (inConn instanceof Channel) ? (Channel) inConn : null; final ShipStrategyType shipType = channel != null ? channel.getShipStrategy() : - ((PactConnection) inConn).getShipStrategy(); + ((DagConnection) inConn).getShipStrategy(); String shipStrategy = null; if (shipType != null) { @@ -588,8 +588,8 @@ public class PlanJSONDumpGenerator { } // output the node compiler hints - if (n.getPactContract().getCompilerHints() != null) { - CompilerHints hints = n.getPactContract().getCompilerHints(); + if (n.getOperator().getCompilerHints() != null) { + CompilerHints hints = n.getOperator().getCompilerHints(); CompilerHints defaults = new CompilerHints(); String size = hints.getOutputSize() == defaults.getOutputSize() ? "(none)" : String.valueOf(hints.getOutputSize());