http://git-wip-us.apache.org/repos/asf/flink/blob/a9150b30/flink-compiler/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git
a/flink-compiler/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
new file mode 100644
index 0000000..04bc527
--- /dev/null
+++
b/flink-compiler/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -0,0 +1,1578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plantranslate;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.aggregators.AggregatorRegistry;
+import org.apache.flink.api.common.aggregators.AggregatorWithName;
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.cache.DistributedCache;
+import
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.IterationPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.NamedChannel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plan.WorksetPlanNode;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import
org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
+import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
+import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
+import
org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
+import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.CoGroupDriver;
+import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver;
+import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver;
+import org.apache.flink.runtime.operators.DataSinkTask;
+import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver;
+import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
+import org.apache.flink.runtime.operators.MatchDriver;
+import org.apache.flink.runtime.operators.NoOpDriver;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.chaining.ChainedDriver;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Visitor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This component translates the optimizer's resulting {@link
org.apache.flink.optimizer.plan.OptimizedPlan}
+ * to a {@link org.apache.flink.runtime.jobgraph.JobGraph}. The translation is
not strictly a one-to-one,
+ * because some nodes from the OptimizedPlan are collapsed into one job vertex.
+ *
+ * This translation does not make any decisions or assumptions. All
degrees-of-freedom in the execution
+ * of the job are made by the Optimizer, so that this translation becomes a
deterministic mapping.
+ *
+ * The basic method of operation is a top down traversal over the plan graph.
On the way down, job vertices
+ * are created for the plan nodes, on the way back up, the nodes connect their
predecessors.
+ */
+public class JobGraphGenerator implements Visitor<PlanNode> {
+
+ public static final String MERGE_ITERATION_AUX_TASKS_KEY =
"compiler.merge-iteration-aux";
+
+ private static final boolean mergeIterationAuxTasks =
GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
+
+ private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new
TaskInChain(null, null, null);
+
+ //
------------------------------------------------------------------------
+
+ private Map<PlanNode, AbstractJobVertex> vertices; // a map from
optimizer nodes to job vertices
+
+ private Map<PlanNode, TaskInChain> chainedTasks; // a map from
optimizer nodes to job vertices
+
+ private Map<IterationPlanNode, IterationDescriptor> iterations;
+
+ private List<TaskInChain> chainedTasksInSequence;
+
+ private List<AbstractJobVertex> auxVertices; // auxiliary vertices
which are added during job graph generation
+
+ private final int defaultMaxFan;
+
+ private final float defaultSortSpillingThreshold;
+
+ private int iterationIdEnumerator = 1;
+
+ private IterationPlanNode currentIteration; // the current the
enclosing iteration
+
+ private List<IterationPlanNode> iterationStack; // stack of enclosing
iterations
+
+ private SlotSharingGroup sharingGroup;
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * Creates a new job graph generator that uses the default values for
its resource configuration.
+ */
+ public JobGraphGenerator() {
+ this.defaultMaxFan = ConfigConstants.DEFAULT_SPILLING_MAX_FAN;
+ this.defaultSortSpillingThreshold =
ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD;
+ }
+
+ public JobGraphGenerator(Configuration config) {
+ this.defaultMaxFan =
config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY,
+ ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
+ this.defaultSortSpillingThreshold =
config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
+ ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
+ }
+
+ /**
+ * Translates a {@link org.apache.flink.optimizer.plan.OptimizedPlan}
into a
+ * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
+ *
+ * @param program Optimized plan that is translated into a JobGraph.
+ * @return JobGraph generated frmo the plan.
+ */
+ public JobGraph compileJobGraph(OptimizedPlan program) {
+ this.vertices = new HashMap<PlanNode, AbstractJobVertex>();
+ this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
+ this.chainedTasksInSequence = new ArrayList<TaskInChain>();
+ this.auxVertices = new ArrayList<AbstractJobVertex>();
+ this.iterations = new HashMap<IterationPlanNode,
IterationDescriptor>();
+ this.iterationStack = new ArrayList<IterationPlanNode>();
+
+ this.sharingGroup = new SlotSharingGroup();
+
+ // this starts the traversal that generates the job graph
+ program.accept(this);
+
+ // sanity check that we are not somehow in an iteration at the
end
+ if (this.currentIteration != null) {
+ throw new CompilerException("The graph translation
ended prematurely, leaving an unclosed iteration.");
+ }
+
+ // finalize the iterations
+ for (IterationDescriptor iteration : this.iterations.values()) {
+ if (iteration.getIterationNode() instanceof
BulkIterationPlanNode) {
+ finalizeBulkIteration(iteration);
+ } else if (iteration.getIterationNode() instanceof
WorksetIterationPlanNode) {
+ finalizeWorksetIteration(iteration);
+ } else {
+ throw new CompilerException();
+ }
+ }
+
+ // now that the traversal is done, we have the chained tasks
write their configs into their
+ // parents' configurations
+ for (TaskInChain tic : this.chainedTasksInSequence) {
+ TaskConfig t = new
TaskConfig(tic.getContainingVertex().getConfiguration());
+ t.addChainedTask(tic.getChainedTask(),
tic.getTaskConfig(), tic.getTaskName());
+ }
+
+ // create the job graph object
+ JobGraph graph = new JobGraph(program.getJobName());
+
graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
+ graph.setAllowQueuedScheduling(false);
+
+ // add vertices to the graph
+ for (AbstractJobVertex vertex : this.vertices.values()) {
+ graph.addVertex(vertex);
+ }
+
+ for (AbstractJobVertex vertex : this.auxVertices) {
+ graph.addVertex(vertex);
+ vertex.setSlotSharingGroup(sharingGroup);
+ }
+
+ // add registered cache file into job configuration
+ for (Entry<String, DistributedCacheEntry> e :
program.getOriginalPactPlan().getCachedFiles()) {
+ DistributedCache.writeFileInfoToConfig(e.getKey(),
e.getValue(), graph.getJobConfiguration());
+ }
+
+ try {
+ InstantiationUtil.writeObjectToConfig(
+
program.getOriginalPactPlan().getExecutionConfig(),
+ graph.getJobConfiguration(),
+ ExecutionConfig.CONFIG_KEY);
+ } catch (IOException e) {
+ throw new RuntimeException("Config object could not be
written to Job Configuration: " + e);
+ }
+
+ // release all references again
+ this.vertices = null;
+ this.chainedTasks = null;
+ this.chainedTasksInSequence = null;
+ this.auxVertices = null;
+ this.iterations = null;
+ this.iterationStack = null;
+
+ // return job graph
+ return graph;
+ }
+
+ /**
+ * This methods implements the pre-visiting during a depth-first
traversal. It create the job vertex and
+ * sets local strategy.
+ *
+ * @param node
+ * The node that is currently processed.
+ * @return True, if the visitor should descend to the node's children,
false if not.
+ * @see
org.apache.flink.util.Visitor#preVisit(org.apache.flink.util.Visitable)
+ */
+ @Override
+ public boolean preVisit(PlanNode node) {
+ // check if we have visited this node before. in non-tree
graphs, this happens
+ if (this.vertices.containsKey(node) ||
this.chainedTasks.containsKey(node) || this.iterations.containsKey(node)) {
+ // return false to prevent further descend
+ return false;
+ }
+
+ // the vertex to be created for the current node
+ final AbstractJobVertex vertex;
+ try {
+ if (node instanceof SinkPlanNode) {
+ vertex = createDataSinkVertex((SinkPlanNode)
node);
+ }
+ else if (node instanceof SourcePlanNode) {
+ vertex =
createDataSourceVertex((SourcePlanNode) node);
+ }
+ else if (node instanceof BulkIterationPlanNode) {
+ BulkIterationPlanNode iterationNode =
(BulkIterationPlanNode) node;
+ // for the bulk iteration, we skip creating
anything for now. we create the graph
+ // for the step function in the post visit.
+
+ // check that the root of the step function has
the same DOP as the iteration.
+ // because the tail must have the same DOP as
the head, we can only merge the last
+ // operator with the tail, if they have the
same DOP. not merging is currently not
+ // implemented
+ PlanNode root =
iterationNode.getRootOfStepFunction();
+ if (root.getParallelism() !=
node.getParallelism())
+ {
+ throw new CompilerException("Error: The
final operator of the step " +
+ "function has a
different degree of parallelism than the iteration operator itself.");
+ }
+
+ IterationDescriptor descr = new
IterationDescriptor(iterationNode, this.iterationIdEnumerator++);
+ this.iterations.put(iterationNode, descr);
+ vertex = null;
+ }
+ else if (node instanceof WorksetIterationPlanNode) {
+ WorksetIterationPlanNode iterationNode =
(WorksetIterationPlanNode) node;
+
+ // we have the same constraints as for the bulk
iteration
+ PlanNode nextWorkSet =
iterationNode.getNextWorkSetPlanNode();
+ PlanNode solutionSetDelta =
iterationNode.getSolutionSetDeltaPlanNode();
+
+ if (nextWorkSet.getParallelism() !=
node.getParallelism())
+ {
+ throw new CompilerException("It is
currently not supported that the final operator of the step " +
+ "function has a
different degree of parallelism than the iteration operator itself.");
+ }
+ if (solutionSetDelta.getParallelism() !=
node.getParallelism())
+ {
+ throw new CompilerException("It is
currently not supported that the final operator of the step " +
+ "function has a
different degree of parallelism than the iteration operator itself.");
+ }
+
+ IterationDescriptor descr = new
IterationDescriptor(iterationNode, this.iterationIdEnumerator++);
+ this.iterations.put(iterationNode, descr);
+ vertex = null;
+ }
+ else if (node instanceof SingleInputPlanNode) {
+ vertex =
createSingleInputVertex((SingleInputPlanNode) node);
+ }
+ else if (node instanceof DualInputPlanNode) {
+ vertex =
createDualInputVertex((DualInputPlanNode) node);
+ }
+ else if (node instanceof NAryUnionPlanNode) {
+ // skip the union for now
+ vertex = null;
+ }
+ else if (node instanceof BulkPartialSolutionPlanNode) {
+ // create a head node (or not, if it is merged
into its successor)
+ vertex =
createBulkIterationHead((BulkPartialSolutionPlanNode) node);
+ }
+ else if (node instanceof SolutionSetPlanNode) {
+ // this represents an access into the solution
set index.
+ // we do not create a vertex for the solution
set here (we create the head at the workset place holder)
+
+ // we adjust the joins / cogroups that go into
the solution set here
+ for (Channel c : node.getOutgoingChannels()) {
+ DualInputPlanNode target =
(DualInputPlanNode) c.getTarget();
+ AbstractJobVertex accessingVertex =
this.vertices.get(target);
+ TaskConfig conf = new
TaskConfig(accessingVertex.getConfiguration());
+ int inputNum = c == target.getInput1()
? 0 : c == target.getInput2() ? 1 : -1;
+
+ // sanity checks
+ if (inputNum == -1) {
+ throw new CompilerException();
+ }
+
+ // adjust the driver
+ if
(conf.getDriver().equals(MatchDriver.class)) {
+ conf.setDriver(inputNum == 0 ?
JoinWithSolutionSetFirstDriver.class : JoinWithSolutionSetSecondDriver.class);
+ }
+ else if
(conf.getDriver().equals(CoGroupDriver.class)) {
+ conf.setDriver(inputNum == 0 ?
CoGroupWithSolutionSetFirstDriver.class :
CoGroupWithSolutionSetSecondDriver.class);
+ }
+ else {
+ throw new
CompilerException("Found join with solution set using incompatible operator
(only Join/CoGroup are valid).");
+ }
+ }
+
+ // make sure we do not visit this node again.
for that, we add a 'already seen' entry into one of the sets
+ this.chainedTasks.put(node,
ALREADY_VISITED_PLACEHOLDER);
+
+ vertex = null;
+ }
+ else if (node instanceof WorksetPlanNode) {
+ // create the iteration head here
+ vertex =
createWorksetIterationHead((WorksetPlanNode) node);
+ }
+ else {
+ throw new CompilerException("Unrecognized node
type: " + node.getClass().getName());
+ }
+ }
+ catch (Exception e) {
+ throw new CompilerException("Error translating node '"
+ node + "': " + e.getMessage(), e);
+ }
+
+ // check if a vertex was created, or if it was chained or
skipped
+ if (vertex != null) {
+ // set degree of parallelism
+ int pd = node.getParallelism();
+ vertex.setParallelism(pd);
+
+ vertex.setSlotSharingGroup(sharingGroup);
+
+ // check whether this vertex is part of an iteration
step function
+ if (this.currentIteration != null) {
+ // check that the task has the same DOP as the
iteration as such
+ PlanNode iterationNode = (PlanNode)
this.currentIteration;
+ if (iterationNode.getParallelism() < pd) {
+ throw new CompilerException("Error: All
functions that are part of an iteration must have the same, or a lower,
degree-of-parallelism than the iteration operator.");
+ }
+
+ // store the id of the iterations the step
functions participate in
+ IterationDescriptor descr =
this.iterations.get(this.currentIteration);
+ new
TaskConfig(vertex.getConfiguration()).setIterationId(descr.getId());
+ }
+
+ // store in the map
+ this.vertices.put(node, vertex);
+ }
+
+ // returning true causes deeper descend
+ return true;
+ }
+
+ /**
+ * This method implements the post-visit during the depth-first
traversal. When the post visit happens,
+ * all of the descendants have been processed, so this method connects
all of the current node's
+ * predecessors to the current node.
+ *
+ * @param node
+ * The node currently processed during the post-visit.
+ * @see
org.apache.flink.util.Visitor#postVisit(org.apache.flink.util.Visitable) t
+ */
+ @Override
+ public void postVisit(PlanNode node) {
+ try {
+ // --------- check special cases for which we handle
post visit differently ----------
+
+ // skip data source node (they have no inputs)
+ // also, do nothing for union nodes, we connect them
later when gathering the inputs for a task
+ // solution sets have no input. the initial solution
set input is connected when the iteration node is in its postVisit
+ if (node instanceof SourcePlanNode || node instanceof
NAryUnionPlanNode || node instanceof SolutionSetPlanNode) {
+ return;
+ }
+
+ // check if we have an iteration. in that case,
translate the step function now
+ if (node instanceof IterationPlanNode) {
+ // prevent nested iterations
+ if (node.isOnDynamicPath()) {
+ throw new CompilerException("Nested
Iterations are not possible at the moment!");
+ }
+
+ // if we recursively go into an iteration
(because the constant path of one iteration contains
+ // another one), we push the current one onto
the stack
+ if (this.currentIteration != null) {
+
this.iterationStack.add(this.currentIteration);
+ }
+
+ this.currentIteration = (IterationPlanNode)
node;
+
this.currentIteration.acceptForStepFunction(this);
+
+ // pop the current iteration from the stack
+ if (this.iterationStack.isEmpty()) {
+ this.currentIteration = null;
+ } else {
+ this.currentIteration =
this.iterationStack.remove(this.iterationStack.size() - 1);
+ }
+
+ // inputs for initial bulk partial solution or
initial workset are already connected to the iteration head in the head's post
visit.
+ // connect the initial solution set now.
+ if (node instanceof WorksetIterationPlanNode) {
+ // connect the initial solution set
+ WorksetIterationPlanNode wsNode =
(WorksetIterationPlanNode) node;
+ AbstractJobVertex headVertex =
this.iterations.get(wsNode).getHeadTask();
+ TaskConfig headConfig = new
TaskConfig(headVertex.getConfiguration());
+ int inputIndex =
headConfig.getDriverStrategy().getNumInputs();
+
headConfig.setIterationHeadSolutionSetInputIndex(inputIndex);
+
translateChannel(wsNode.getInitialSolutionSetInput(), inputIndex, headVertex,
headConfig, false);
+ }
+
+ return;
+ }
+
+ final AbstractJobVertex targetVertex =
this.vertices.get(node);
+
+
+ // --------- Main Path: Translation of channels
----------
+ //
+ // There are two paths of translation: One for chained
tasks (or merged tasks in general),
+ // which do not have their own task vertex. The other
for tasks that have their own vertex,
+ // or are the primary task in a vertex (to which the
others are chained).
+
+ // check whether this node has its own task, or is
merged with another one
+ if (targetVertex == null) {
+ // node's task is merged with another task. it
is either chained, of a merged head vertex
+ // from an iteration
+ final TaskInChain chainedTask;
+ if ((chainedTask = this.chainedTasks.get(node))
!= null) {
+ // Chained Task. Sanity check first...
+ final Iterator<Channel> inConns =
node.getInputs().iterator();
+ if (!inConns.hasNext()) {
+ throw new
CompilerException("Bug: Found chained task with no input.");
+ }
+ final Channel inConn = inConns.next();
+
+ if (inConns.hasNext()) {
+ throw new
CompilerException("Bug: Found a chained task with more than one input!");
+ }
+ if (inConn.getLocalStrategy() != null
&& inConn.getLocalStrategy() != LocalStrategy.NONE) {
+ throw new
CompilerException("Bug: Found a chained task with an input local strategy.");
+ }
+ if (inConn.getShipStrategy() != null &&
inConn.getShipStrategy() != ShipStrategyType.FORWARD) {
+ throw new
CompilerException("Bug: Found a chained task with an input ship strategy other
than FORWARD.");
+ }
+
+ AbstractJobVertex container =
chainedTask.getContainingVertex();
+
+ if (container == null) {
+ final PlanNode sourceNode =
inConn.getSource();
+ container =
this.vertices.get(sourceNode);
+ if (container == null) {
+ // predecessor is
itself chained
+ container =
this.chainedTasks.get(sourceNode).getContainingVertex();
+ if (container == null) {
+ throw new
IllegalStateException("Bug: Chained task predecessor has not been assigned its
containing vertex.");
+ }
+ } else {
+ // predecessor is a
proper task job vertex and this is the first chained task. add a forward
connection entry.
+ new
TaskConfig(container.getConfiguration()).addOutputShipStrategy(ShipStrategyType.FORWARD);
+ }
+
chainedTask.setContainingVertex(container);
+ }
+
+ // add info about the input serializer
type
+
chainedTask.getTaskConfig().setInputSerializer(inConn.getSerializer(), 0);
+
+ // update name of container task
+ String containerTaskName =
container.getName();
+ if(containerTaskName.startsWith("CHAIN
")) {
+
container.setName(containerTaskName+" -> "+chainedTask.getTaskName());
+ } else {
+ container.setName("CHAIN
"+containerTaskName+" -> "+chainedTask.getTaskName());
+ }
+
+
this.chainedTasksInSequence.add(chainedTask);
+ return;
+ }
+ else if (node instanceof
BulkPartialSolutionPlanNode ||
+ node instanceof WorksetPlanNode)
+ {
+ // merged iteration head task. the task
that the head is merged with will take care of it
+ return;
+ } else {
+ throw new CompilerException("Bug:
Unrecognized merged task vertex.");
+ }
+ }
+
+ // -------- Here, we translate non-chained tasks
-------------
+
+
+ if (this.currentIteration != null) {
+ AbstractJobVertex head =
this.iterations.get(this.currentIteration).getHeadTask();
+ // the head may still be null if we descend
into the static parts first
+ if (head != null) {
+
targetVertex.setStrictlyCoLocatedWith(head);
+ }
+ }
+
+
+ // create the config that will contain all the
description of the inputs
+ final TaskConfig targetVertexConfig = new
TaskConfig(targetVertex.getConfiguration());
+
+ // get the inputs. if this node is the head of an
iteration, we obtain the inputs from the
+ // enclosing iteration node, because the inputs are the
initial inputs to the iteration.
+ final Iterator<Channel> inConns;
+ if (node instanceof BulkPartialSolutionPlanNode) {
+ inConns = ((BulkPartialSolutionPlanNode)
node).getContainingIterationNode().getInputs().iterator();
+ // because the partial solution has its own
vertex, is has only one (logical) input.
+ // note this in the task configuration
+
targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
+ } else if (node instanceof WorksetPlanNode) {
+ WorksetPlanNode wspn = (WorksetPlanNode) node;
+ // input that is the initial workset
+ inConns =
Collections.singleton(wspn.getContainingIterationNode().getInput2()).iterator();
+
+ // because we have a stand-alone (non-merged)
workset iteration head, the initial workset will
+ // be input 0 and the solution set will be
input 1
+
targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
+
targetVertexConfig.setIterationHeadSolutionSetInputIndex(1);
+ } else {
+ inConns = node.getInputs().iterator();
+ }
+ if (!inConns.hasNext()) {
+ throw new CompilerException("Bug: Found a
non-source task with no input.");
+ }
+
+ int inputIndex = 0;
+ while (inConns.hasNext()) {
+ Channel input = inConns.next();
+ inputIndex += translateChannel(input,
inputIndex, targetVertex, targetVertexConfig, false);
+ }
+ // broadcast variables
+ int broadcastInputIndex = 0;
+ for (NamedChannel broadcastInput:
node.getBroadcastInputs()) {
+ int broadcastInputIndexDelta =
translateChannel(broadcastInput, broadcastInputIndex, targetVertex,
targetVertexConfig, true);
+
targetVertexConfig.setBroadcastInputName(broadcastInput.getName(),
broadcastInputIndex);
+
targetVertexConfig.setBroadcastInputSerializer(broadcastInput.getSerializer(),
broadcastInputIndex);
+ broadcastInputIndex += broadcastInputIndexDelta;
+ }
+ } catch (Exception e) {
+ throw new CompilerException(
+ "An error occurred while translating the
optimized plan to a nephele JobGraph: " + e.getMessage(), e);
+ }
+ }
+
+ private int translateChannel(Channel input, int inputIndex,
AbstractJobVertex targetVertex,
+ TaskConfig targetVertexConfig, boolean isBroadcast)
throws Exception
+ {
+ final PlanNode inputPlanNode = input.getSource();
+ final Iterator<Channel> allInChannels;
+
+ if (inputPlanNode instanceof NAryUnionPlanNode) {
+ allInChannels = ((NAryUnionPlanNode)
inputPlanNode).getListOfInputs().iterator();
+ }
+ else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) {
+ if (this.vertices.get(inputPlanNode) == null) {
+ // merged iteration head
+ final BulkPartialSolutionPlanNode pspn =
(BulkPartialSolutionPlanNode) inputPlanNode;
+ final BulkIterationPlanNode iterationNode =
pspn.getContainingIterationNode();
+
+ // check if the iteration's input is a union
+ if (iterationNode.getInput().getSource()
instanceof NAryUnionPlanNode) {
+ allInChannels = ((NAryUnionPlanNode)
iterationNode.getInput().getSource()).getInputs().iterator();
+ } else {
+ allInChannels =
Collections.singletonList(iterationNode.getInput()).iterator();
+ }
+
+ // also, set the index of the gate with the
partial solution
+
targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(inputIndex);
+ } else {
+ // standalone iteration head
+ allInChannels =
Collections.singletonList(input).iterator();
+ }
+ } else if (inputPlanNode instanceof WorksetPlanNode) {
+ if (this.vertices.get(inputPlanNode) == null) {
+ // merged iteration head
+ final WorksetPlanNode wspn = (WorksetPlanNode)
inputPlanNode;
+ final WorksetIterationPlanNode iterationNode =
wspn.getContainingIterationNode();
+
+ // check if the iteration's input is a union
+ if (iterationNode.getInput2().getSource()
instanceof NAryUnionPlanNode) {
+ allInChannels = ((NAryUnionPlanNode)
iterationNode.getInput2().getSource()).getInputs().iterator();
+ } else {
+ allInChannels =
Collections.singletonList(iterationNode.getInput2()).iterator();
+ }
+
+ // also, set the index of the gate with the
partial solution
+
targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(inputIndex);
+ } else {
+ // standalone iteration head
+ allInChannels =
Collections.singletonList(input).iterator();
+ }
+ } else if (inputPlanNode instanceof SolutionSetPlanNode) {
+ // for now, skip connections with the solution set
node, as this is a local index access (later to be parameterized here)
+ // rather than a vertex connection
+ return 0;
+ } else {
+ allInChannels =
Collections.singletonList(input).iterator();
+ }
+
+ // check that the type serializer is consistent
+ TypeSerializerFactory<?> typeSerFact = null;
+
+ // accounting for channels on the dynamic path
+ int numChannelsTotal = 0;
+ int numChannelsDynamicPath = 0;
+ int numDynamicSenderTasksTotal = 0;
+
+
+ // expand the channel to all the union channels, in case there
is a union operator at its source
+ while (allInChannels.hasNext()) {
+ final Channel inConn = allInChannels.next();
+
+ // sanity check the common serializer
+ if (typeSerFact == null) {
+ typeSerFact = inConn.getSerializer();
+ } else if (!typeSerFact.equals(inConn.getSerializer()))
{
+ throw new CompilerException("Conflicting types
in union operator.");
+ }
+
+ final PlanNode sourceNode = inConn.getSource();
+ AbstractJobVertex sourceVertex =
this.vertices.get(sourceNode);
+ TaskConfig sourceVertexConfig;
+
+ if (sourceVertex == null) {
+ // this predecessor is chained to another task
or an iteration
+ final TaskInChain chainedTask;
+ final IterationDescriptor iteration;
+ if ((chainedTask =
this.chainedTasks.get(sourceNode)) != null) {
+ // push chained task
+ if (chainedTask.getContainingVertex()
== null) {
+ throw new
IllegalStateException("Bug: Chained task has not been assigned its containing
vertex when connecting.");
+ }
+ sourceVertex =
chainedTask.getContainingVertex();
+ sourceVertexConfig =
chainedTask.getTaskConfig();
+ } else if ((iteration =
this.iterations.get(sourceNode)) != null) {
+ // predecessor is an iteration
+ sourceVertex = iteration.getHeadTask();
+ sourceVertexConfig =
iteration.getHeadFinalResultConfig();
+ } else {
+ throw new CompilerException("Bug: Could
not resolve source node for a channel.");
+ }
+ } else {
+ // predecessor is its own vertex
+ sourceVertexConfig = new
TaskConfig(sourceVertex.getConfiguration());
+ }
+ DistributionPattern pattern = connectJobVertices(
+ inConn, inputIndex, sourceVertex,
sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast);
+
+ // accounting on channels and senders
+ numChannelsTotal++;
+ if (inConn.isOnDynamicPath()) {
+ numChannelsDynamicPath++;
+ numDynamicSenderTasksTotal +=
getNumberOfSendersPerReceiver(pattern,
+ sourceVertex.getParallelism(),
targetVertex.getParallelism());
+ }
+ }
+
+ // for the iterations, check that the number of dynamic
channels is the same as the number
+ // of channels for this logical input. this condition is
violated at the moment, if there
+ // is a union between nodes on the static and nodes on the
dynamic path
+ if (numChannelsDynamicPath > 0 && numChannelsTotal !=
numChannelsDynamicPath) {
+ throw new CompilerException("Error: It is currently not
supported to union between dynamic and static path in an iteration.");
+ }
+ if (numDynamicSenderTasksTotal > 0) {
+ if (isBroadcast) {
+
targetVertexConfig.setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(inputIndex,
numDynamicSenderTasksTotal);
+ } else {
+
targetVertexConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(inputIndex,
numDynamicSenderTasksTotal);
+ }
+ }
+
+ // the local strategy is added only once. in non-union case
that is the actual edge,
+ // in the union case, it is the edge between union and the
target node
+ addLocalInfoFromChannelToConfig(input, targetVertexConfig,
inputIndex, isBroadcast);
+ return 1;
+ }
+
+ private int getNumberOfSendersPerReceiver(DistributionPattern pattern,
int numSenders, int numReceivers) {
+ if (pattern == DistributionPattern.ALL_TO_ALL) {
+ return numSenders;
+ } else if (pattern == DistributionPattern.POINTWISE) {
+ if (numSenders != numReceivers) {
+ if (numReceivers == 1) {
+ return numSenders;
+ }
+ else if (numSenders == 1) {
+ return 1;
+ }
+ else {
+ throw new CompilerException("Error: A
changing degree of parallelism is currently " +
+ "not supported between
tasks within an iteration.");
+ }
+ } else {
+ return 1;
+ }
+ } else {
+ throw new CompilerException("Unknown distribution
pattern for channels: " + pattern);
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // Methods for creating individual vertices
+ //
------------------------------------------------------------------------
+
+ private AbstractJobVertex createSingleInputVertex(SingleInputPlanNode
node) throws CompilerException {
+ final String taskName = node.getNodeName();
+ final DriverStrategy ds = node.getDriverStrategy();
+
+ // check, whether chaining is possible
+ boolean chaining = false;
+ {
+ Channel inConn = node.getInput();
+ PlanNode pred = inConn.getSource();
+ chaining = ds.getPushChainDriverClass() != null &&
+ !(pred instanceof NAryUnionPlanNode) &&
// first op after union is stand-alone, because union is merged
+ !(pred instanceof
BulkPartialSolutionPlanNode) && // partial solution merges anyways
+ !(pred instanceof WorksetPlanNode) &&
// workset merges anyways
+ !(pred instanceof IterationPlanNode) &&
// cannot chain with iteration heads currently
+ inConn.getShipStrategy() ==
ShipStrategyType.FORWARD &&
+ inConn.getLocalStrategy() ==
LocalStrategy.NONE &&
+ pred.getOutgoingChannels().size() == 1
&&
+ node.getParallelism() ==
pred.getParallelism() &&
+ node.getBroadcastInputs().isEmpty();
+
+ // cannot chain the nodes that produce the next workset
or the next solution set, if they are not the
+ // in a tail
+ if (this.currentIteration != null &&
this.currentIteration instanceof WorksetIterationPlanNode &&
+ node.getOutgoingChannels().size() > 0)
+ {
+ WorksetIterationPlanNode wspn =
(WorksetIterationPlanNode) this.currentIteration;
+ if (wspn.getSolutionSetDeltaPlanNode() == pred
|| wspn.getNextWorkSetPlanNode() == pred) {
+ chaining = false;
+ }
+ }
+ // cannot chain the nodes that produce the next workset
in a bulk iteration if a termination criterion follows
+ if (this.currentIteration != null &&
this.currentIteration instanceof BulkIterationPlanNode)
+ {
+ BulkIterationPlanNode wspn =
(BulkIterationPlanNode) this.currentIteration;
+ if (node ==
wspn.getRootOfTerminationCriterion() && wspn.getRootOfStepFunction() == pred){
+ chaining = false;
+ }else if(node.getOutgoingChannels().size() > 0
&&(wspn.getRootOfStepFunction() == pred ||
+
wspn.getRootOfTerminationCriterion() == pred)) {
+ chaining = false;
+ }
+ }
+ }
+
+ final AbstractJobVertex vertex;
+ final TaskConfig config;
+
+ if (chaining) {
+ vertex = null;
+ config = new TaskConfig(new Configuration());
+ this.chainedTasks.put(node, new
TaskInChain(ds.getPushChainDriverClass(), config, taskName));
+ } else {
+ // create task vertex
+ vertex = new AbstractJobVertex(taskName);
+ vertex.setInvokableClass((this.currentIteration != null
&& node.isOnDynamicPath()) ? IterationIntermediatePactTask.class :
RegularPactTask.class);
+
+ config = new TaskConfig(vertex.getConfiguration());
+ config.setDriver(ds.getDriverClass());
+ }
+
+ // set user code
+
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+
config.setStubParameters(node.getProgramOperator().getParameters());
+
+ // set the driver strategy
+ config.setDriverStrategy(ds);
+ for(int i=0;i<ds.getNumRequiredComparators();i++) {
+ config.setDriverComparator(node.getComparator(i), i);
+ }
+ // assign memory, file-handles, etc.
+ assignDriverResources(node, config);
+ return vertex;
+ }
+
+ private AbstractJobVertex createDualInputVertex(DualInputPlanNode node)
throws CompilerException {
+ final String taskName = node.getNodeName();
+ final DriverStrategy ds = node.getDriverStrategy();
+ final AbstractJobVertex vertex = new
AbstractJobVertex(taskName);
+ final TaskConfig config = new
TaskConfig(vertex.getConfiguration());
+ vertex.setInvokableClass( (this.currentIteration != null &&
node.isOnDynamicPath()) ? IterationIntermediatePactTask.class :
RegularPactTask.class);
+
+ // set user code
+
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+
config.setStubParameters(node.getProgramOperator().getParameters());
+
+ // set the driver strategy
+ config.setDriver(ds.getDriverClass());
+ config.setDriverStrategy(ds);
+ if (node.getComparator1() != null) {
+ config.setDriverComparator(node.getComparator1(), 0);
+ }
+ if (node.getComparator2() != null) {
+ config.setDriverComparator(node.getComparator2(), 1);
+ }
+ if (node.getPairComparator() != null) {
+
config.setDriverPairComparator(node.getPairComparator());
+ }
+
+ // assign memory, file-handles, etc.
+ assignDriverResources(node, config);
+ return vertex;
+ }
+
+ private InputFormatVertex createDataSourceVertex(SourcePlanNode node)
throws CompilerException {
+ final InputFormatVertex vertex = new
InputFormatVertex(node.getNodeName());
+ final TaskConfig config = new
TaskConfig(vertex.getConfiguration());
+
+ vertex.setInvokableClass(DataSourceTask.class);
+
vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
+
+ // set user code
+
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+
config.setStubParameters(node.getProgramOperator().getParameters());
+
+ config.setOutputSerializer(node.getSerializer());
+ return vertex;
+ }
+
+ private AbstractJobVertex createDataSinkVertex(SinkPlanNode node)
throws CompilerException {
+ final OutputFormatVertex vertex = new
OutputFormatVertex(node.getNodeName());
+ final TaskConfig config = new
TaskConfig(vertex.getConfiguration());
+
+ vertex.setInvokableClass(DataSinkTask.class);
+
vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
+
+ // set user code
+
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+
config.setStubParameters(node.getProgramOperator().getParameters());
+
+ return vertex;
+ }
+
+ private AbstractJobVertex
createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
+ // get the bulk iteration that corresponds to this partial
solution node
+ final BulkIterationPlanNode iteration =
pspn.getContainingIterationNode();
+
+ // check whether we need an individual vertex for the partial
solution, or whether we
+ // attach ourselves to the vertex of the parent node. We can
combine the head with a node of
+ // the step function, if
+ // 1) There is one parent that the partial solution connects to
via a forward pattern and no
+ // local strategy
+ // 2) DOP and the number of subtasks per instance does not
change
+ // 3) That successor is not a union
+ // 4) That successor is not itself the last node of the step
function
+ // 5) There is no local strategy on the edge for the initial
partial solution, as
+ // this translates to a local strategy that would only be
executed in the first iteration
+
+ final boolean merge;
+ if (mergeIterationAuxTasks && pspn.getOutgoingChannels().size()
== 1) {
+ final Channel c = pspn.getOutgoingChannels().get(0);
+ final PlanNode successor = c.getTarget();
+ merge = c.getShipStrategy() == ShipStrategyType.FORWARD
&&
+ c.getLocalStrategy() ==
LocalStrategy.NONE &&
+ c.getTempMode() == TempMode.NONE &&
+ successor.getParallelism() ==
pspn.getParallelism() &&
+ !(successor instanceof
NAryUnionPlanNode) &&
+ successor !=
iteration.getRootOfStepFunction() &&
+ iteration.getInput().getLocalStrategy()
== LocalStrategy.NONE;
+ } else {
+ merge = false;
+ }
+
+ // create or adopt the head vertex
+ final AbstractJobVertex toReturn;
+ final AbstractJobVertex headVertex;
+ final TaskConfig headConfig;
+ if (merge) {
+ final PlanNode successor =
pspn.getOutgoingChannels().get(0).getTarget();
+ headVertex = (AbstractJobVertex)
this.vertices.get(successor);
+
+ if (headVertex == null) {
+ throw new CompilerException(
+ "Bug: Trying to merge solution set with
its sucessor, but successor has not been created.");
+ }
+
+ // reset the vertex type to iteration head
+
headVertex.setInvokableClass(IterationHeadPactTask.class);
+ headConfig = new
TaskConfig(headVertex.getConfiguration());
+ toReturn = null;
+ } else {
+ // instantiate the head vertex and give it a no-op
driver as the driver strategy.
+ // everything else happens in the post visit, after the
input (the initial partial solution)
+ // is connected.
+ headVertex = new AbstractJobVertex("PartialSolution
("+iteration.getNodeName()+")");
+
headVertex.setInvokableClass(IterationHeadPactTask.class);
+ headConfig = new
TaskConfig(headVertex.getConfiguration());
+ headConfig.setDriver(NoOpDriver.class);
+ toReturn = headVertex;
+ }
+
+ // create the iteration descriptor and the iteration to it
+ IterationDescriptor descr = this.iterations.get(iteration);
+ if (descr == null) {
+ throw new CompilerException("Bug: Iteration descriptor
was not created at when translating the iteration node.");
+ }
+ descr.setHeadTask(headVertex, headConfig);
+
+ return toReturn;
+ }
+
+ private AbstractJobVertex createWorksetIterationHead(WorksetPlanNode
wspn) {
+ // get the bulk iteration that corresponds to this partial
solution node
+ final WorksetIterationPlanNode iteration =
wspn.getContainingIterationNode();
+
+ // check whether we need an individual vertex for the partial
solution, or whether we
+ // attach ourselves to the vertex of the parent node. We can
combine the head with a node of
+ // the step function, if
+ // 1) There is one parent that the partial solution connects to
via a forward pattern and no
+ // local strategy
+ // 2) DOP and the number of subtasks per instance does not
change
+ // 3) That successor is not a union
+ // 4) That successor is not itself the last node of the step
function
+ // 5) There is no local strategy on the edge for the initial
workset, as
+ // this translates to a local strategy that would only be
executed in the first superstep
+
+ final boolean merge;
+ if (mergeIterationAuxTasks && wspn.getOutgoingChannels().size()
== 1) {
+ final Channel c = wspn.getOutgoingChannels().get(0);
+ final PlanNode successor = c.getTarget();
+ merge = c.getShipStrategy() == ShipStrategyType.FORWARD
&&
+ c.getLocalStrategy() ==
LocalStrategy.NONE &&
+ c.getTempMode() == TempMode.NONE &&
+ successor.getParallelism() ==
wspn.getParallelism() &&
+ !(successor instanceof
NAryUnionPlanNode) &&
+ successor !=
iteration.getNextWorkSetPlanNode() &&
+
iteration.getInitialWorksetInput().getLocalStrategy() == LocalStrategy.NONE;
+ } else {
+ merge = false;
+ }
+
+ // create or adopt the head vertex
+ final AbstractJobVertex toReturn;
+ final AbstractJobVertex headVertex;
+ final TaskConfig headConfig;
+ if (merge) {
+ final PlanNode successor =
wspn.getOutgoingChannels().get(0).getTarget();
+ headVertex = (AbstractJobVertex)
this.vertices.get(successor);
+
+ if (headVertex == null) {
+ throw new CompilerException(
+ "Bug: Trying to merge solution set with
its sucessor, but successor has not been created.");
+ }
+
+ // reset the vertex type to iteration head
+
headVertex.setInvokableClass(IterationHeadPactTask.class);
+ headConfig = new
TaskConfig(headVertex.getConfiguration());
+ toReturn = null;
+ } else {
+ // instantiate the head vertex and give it a no-op
driver as the driver strategy.
+ // everything else happens in the post visit, after the
input (the initial partial solution)
+ // is connected.
+ headVertex = new
AbstractJobVertex("IterationHead("+iteration.getNodeName()+")");
+
headVertex.setInvokableClass(IterationHeadPactTask.class);
+ headConfig = new
TaskConfig(headVertex.getConfiguration());
+ headConfig.setDriver(NoOpDriver.class);
+ toReturn = headVertex;
+ }
+
+
headConfig.setSolutionSetUnmanaged(iteration.getIterationNode().getIterationContract().isSolutionSetUnManaged());
+
+ // create the iteration descriptor and the iteration to it
+ IterationDescriptor descr = this.iterations.get(iteration);
+ if (descr == null) {
+ throw new CompilerException("Bug: Iteration descriptor
was not created at when translating the iteration node.");
+ }
+ descr.setHeadTask(headVertex, headConfig);
+
+ return toReturn;
+ }
+
+ private void assignDriverResources(PlanNode node, TaskConfig config) {
+ final double relativeMem = node.getRelativeMemoryPerSubTask();
+ if (relativeMem > 0) {
+ config.setRelativeMemoryDriver(relativeMem);
+ config.setFilehandlesDriver(this.defaultMaxFan);
+
config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold);
+ }
+ }
+
+ private void assignLocalStrategyResources(Channel c, TaskConfig config,
int inputNum) {
+ if (c.getRelativeMemoryLocalStrategy() > 0) {
+ config.setRelativeMemoryInput(inputNum,
c.getRelativeMemoryLocalStrategy());
+ config.setFilehandlesInput(inputNum,
this.defaultMaxFan);
+ config.setSpillingThresholdInput(inputNum,
this.defaultSortSpillingThreshold);
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // Connecting Vertices
+ //
------------------------------------------------------------------------
+
+ /**
+ * NOTE: The channel for global and local strategies are different if
we connect a union. The global strategy
+ * channel is then the channel into the union node, the local strategy
channel the one from the union to the
+ * actual target operator.
+ *
+ * @param channel
+ * @param inputNumber
+ * @param sourceVertex
+ * @param sourceConfig
+ * @param targetVertex
+ * @param targetConfig
+ * @param isBroadcast
+ * @throws CompilerException
+ */
+ private DistributionPattern connectJobVertices(Channel channel, int
inputNumber,
+ final AbstractJobVertex sourceVertex, final TaskConfig
sourceConfig,
+ final AbstractJobVertex targetVertex, final TaskConfig
targetConfig, boolean isBroadcast)
+ throws CompilerException
+ {
+ // ------------ connect the vertices to the job graph
--------------
+ final DistributionPattern distributionPattern;
+
+ switch (channel.getShipStrategy()) {
+ case FORWARD:
+ distributionPattern =
DistributionPattern.POINTWISE;
+ break;
+ case PARTITION_RANDOM:
+ case BROADCAST:
+ case PARTITION_HASH:
+ case PARTITION_CUSTOM:
+ case PARTITION_RANGE:
+ case PARTITION_FORCED_REBALANCE:
+ distributionPattern =
DistributionPattern.ALL_TO_ALL;
+ break;
+ default:
+ throw new RuntimeException("Unknown runtime
ship strategy: " + channel.getShipStrategy());
+ }
+
+ final ResultPartitionType resultType;
+
+ switch (channel.getDataExchangeMode()) {
+
+ case PIPELINED:
+ resultType = ResultPartitionType.PIPELINED;
+ break;
+
+ case BATCH:
+ // BLOCKING results are currently not supported
in closed loop iterations
+ //
+ // See
https://issues.apache.org/jira/browse/FLINK-1713 for details
+ resultType =
channel.getSource().isOnDynamicPath()
+ ? ResultPartitionType.PIPELINED
+ : ResultPartitionType.BLOCKING;
+ break;
+
+ case PIPELINE_WITH_BATCH_FALLBACK:
+ throw new UnsupportedOperationException("Data
exchange mode " +
+ channel.getDataExchangeMode() +
" currently not supported.");
+
+ default:
+ throw new
UnsupportedOperationException("Unknown data exchange mode.");
+
+ }
+
+ targetVertex.connectNewDataSetAsInput(sourceVertex,
distributionPattern, resultType);
+
+ // -------------- configure the source task's ship strategy
strategies in task config --------------
+ final int outputIndex = sourceConfig.getNumOutputs();
+ sourceConfig.addOutputShipStrategy(channel.getShipStrategy());
+ if (outputIndex == 0) {
+
sourceConfig.setOutputSerializer(channel.getSerializer());
+ }
+ if (channel.getShipStrategyComparator() != null) {
+
sourceConfig.setOutputComparator(channel.getShipStrategyComparator(),
outputIndex);
+ }
+
+ if (channel.getShipStrategy() ==
ShipStrategyType.PARTITION_RANGE) {
+
+ final DataDistribution dataDistribution =
channel.getDataDistribution();
+ if (dataDistribution != null) {
+
sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
+ } else {
+ throw new RuntimeException("Range partitioning
requires data distribution");
+ // TODO: inject code and configuration for
automatic histogram generation
+ }
+ }
+
+ if (channel.getShipStrategy() ==
ShipStrategyType.PARTITION_CUSTOM) {
+ if (channel.getPartitioner() != null) {
+
sourceConfig.setOutputPartitioner(channel.getPartitioner(), outputIndex);
+ } else {
+ throw new CompilerException("The ship strategy
was set to custom partitioning, but no partitioner was set.");
+ }
+ }
+
+ // ---------------- configure the receiver -------------------
+ if (isBroadcast) {
+ targetConfig.addBroadcastInputToGroup(inputNumber);
+ } else {
+ targetConfig.addInputToGroup(inputNumber);
+ }
+ return distributionPattern;
+ }
+
+ private void addLocalInfoFromChannelToConfig(Channel channel,
TaskConfig config, int inputNum, boolean isBroadcastChannel) {
+ // serializer
+ if (isBroadcastChannel) {
+
config.setBroadcastInputSerializer(channel.getSerializer(), inputNum);
+
+ if (channel.getLocalStrategy() != LocalStrategy.NONE ||
(channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE)) {
+ throw new CompilerException("Found local
strategy or temp mode on a broadcast variable channel.");
+ } else {
+ return;
+ }
+ } else {
+ config.setInputSerializer(channel.getSerializer(),
inputNum);
+ }
+
+ // local strategy
+ if (channel.getLocalStrategy() != LocalStrategy.NONE) {
+ config.setInputLocalStrategy(inputNum,
channel.getLocalStrategy());
+ if (channel.getLocalStrategyComparator() != null) {
+
config.setInputComparator(channel.getLocalStrategyComparator(), inputNum);
+ }
+ }
+
+ assignLocalStrategyResources(channel, config, inputNum);
+
+ // materialization / caching
+ if (channel.getTempMode() != null) {
+ final TempMode tm = channel.getTempMode();
+
+ boolean needsMemory = false;
+ // Don't add a pipeline breaker if the data exchange is
already blocking.
+ if (tm.breaksPipeline() &&
channel.getDataExchangeMode() != DataExchangeMode.BATCH) {
+
config.setInputAsynchronouslyMaterialized(inputNum, true);
+ needsMemory = true;
+ }
+ if (tm.isCached()) {
+ config.setInputCached(inputNum, true);
+ needsMemory = true;
+ }
+
+ if (needsMemory) {
+ // sanity check
+ if (tm == null || tm == TempMode.NONE ||
channel.getRelativeTempMemory() <= 0) {
+ throw new CompilerException("Bug in
compiler: Inconsistent description of input materialization.");
+ }
+
config.setRelativeInputMaterializationMemory(inputNum,
channel.getRelativeTempMemory());
+ }
+ }
+ }
+
+ private void finalizeBulkIteration(IterationDescriptor descr) {
+
+ final BulkIterationPlanNode bulkNode = (BulkIterationPlanNode)
descr.getIterationNode();
+ final AbstractJobVertex headVertex = descr.getHeadTask();
+ final TaskConfig headConfig = new
TaskConfig(headVertex.getConfiguration());
+ final TaskConfig headFinalOutputConfig =
descr.getHeadFinalResultConfig();
+
+ // ------------ finalize the head config with the final outputs
and the sync gate ------------
+ final int numStepFunctionOuts = headConfig.getNumOutputs();
+ final int numFinalOuts = headFinalOutputConfig.getNumOutputs();
+
+ if (numStepFunctionOuts == 0) {
+ throw new CompilerException("The iteration has no
operation inside the step function.");
+ }
+
+
headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
+
headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts +
numFinalOuts);
+ final double relativeMemForBackChannel =
bulkNode.getRelativeMemoryPerSubTask();
+ if (relativeMemForBackChannel <= 0) {
+ throw new CompilerException("Bug: No memory has been
assigned to the iteration back channel.");
+ }
+
headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
+
+ // --------------------------- create the sync task
---------------------------
+ final AbstractJobVertex sync = new AbstractJobVertex("Sync(" +
bulkNode.getNodeName() + ")");
+ sync.setInvokableClass(IterationSynchronizationSinkTask.class);
+ sync.setParallelism(1);
+ this.auxVertices.add(sync);
+
+ final TaskConfig syncConfig = new
TaskConfig(sync.getConfiguration());
+ syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0,
headVertex.getParallelism());
+
+ // set the number of iteration / convergence criterion for the
sync
+ final int maxNumIterations =
bulkNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations();
+ if (maxNumIterations < 1) {
+ throw new CompilerException("Cannot create bulk
iteration with unspecified maximum number of iterations.");
+ }
+ syncConfig.setNumberOfIterations(maxNumIterations);
+
+ // connect the sync task
+ sync.connectNewDataSetAsInput(headVertex,
DistributionPattern.POINTWISE);
+
+ // ----------------------------- create the iteration tail
------------------------------
+
+ final PlanNode rootOfTerminationCriterion =
bulkNode.getRootOfTerminationCriterion();
+ final PlanNode rootOfStepFunction =
bulkNode.getRootOfStepFunction();
+ final TaskConfig tailConfig;
+
+ AbstractJobVertex rootOfStepFunctionVertex =
(AbstractJobVertex) this.vertices.get(rootOfStepFunction);
+ if (rootOfStepFunctionVertex == null) {
+ // last op is chained
+ final TaskInChain taskInChain =
this.chainedTasks.get(rootOfStepFunction);
+ if (taskInChain == null) {
+ throw new CompilerException("Bug: Tail of step
function not found as vertex or chained task.");
+ }
+ rootOfStepFunctionVertex = (AbstractJobVertex)
taskInChain.getContainingVertex();
+
+ // the fake channel is statically typed to pact record.
no data is sent over this channel anyways.
+ tailConfig = taskInChain.getTaskConfig();
+ } else {
+ tailConfig = new
TaskConfig(rootOfStepFunctionVertex.getConfiguration());
+ }
+
+ tailConfig.setIsWorksetUpdate();
+
+ // No following termination criterion
+ if (rootOfStepFunction.getOutgoingChannels().isEmpty()) {
+
+
rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);
+
+
tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
+ }
+
+
+ // create the fake output task for termination criterion, if
needed
+ final TaskConfig tailConfigOfTerminationCriterion;
+ // If we have a termination criterion and it is not an
intermediate node
+ if(rootOfTerminationCriterion != null &&
rootOfTerminationCriterion.getOutgoingChannels().isEmpty()) {
+ AbstractJobVertex rootOfTerminationCriterionVertex =
(AbstractJobVertex) this.vertices.get(rootOfTerminationCriterion);
+
+
+ if (rootOfTerminationCriterionVertex == null) {
+ // last op is chained
+ final TaskInChain taskInChain =
this.chainedTasks.get(rootOfTerminationCriterion);
+ if (taskInChain == null) {
+ throw new CompilerException("Bug: Tail
of termination criterion not found as vertex or chained task.");
+ }
+ rootOfTerminationCriterionVertex =
(AbstractJobVertex) taskInChain.getContainingVertex();
+
+ // the fake channel is statically typed to pact
record. no data is sent over this channel anyways.
+ tailConfigOfTerminationCriterion =
taskInChain.getTaskConfig();
+ } else {
+ tailConfigOfTerminationCriterion = new
TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
+ }
+
+
rootOfTerminationCriterionVertex.setInvokableClass(IterationTailPactTask.class);
+ // Hack
+
tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
+
tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
+
+ // tell the head that it needs to wait for the solution
set updates
+ headConfig.setWaitForSolutionSetUpdate();
+ }
+
+ // ------------------- register the aggregators
-------------------
+ AggregatorRegistry aggs =
bulkNode.getIterationNode().getIterationContract().getAggregators();
+ Collection<AggregatorWithName<?>> allAggregators =
aggs.getAllRegisteredAggregators();
+
+ headConfig.addIterationAggregators(allAggregators);
+ syncConfig.addIterationAggregators(allAggregators);
+
+ String convAggName =
aggs.getConvergenceCriterionAggregatorName();
+ ConvergenceCriterion<?> convCriterion =
aggs.getConvergenceCriterion();
+
+ if (convCriterion != null || convAggName != null) {
+ if (convCriterion == null) {
+ throw new CompilerException("Error: Convergence
criterion aggregator set, but criterion is null.");
+ }
+ if (convAggName == null) {
+ throw new CompilerException("Error: Aggregator
convergence criterion set, but aggregator is null.");
+ }
+
+ syncConfig.setConvergenceCriterion(convAggName,
convCriterion);
+ }
+ }
+
+ private void finalizeWorksetIteration(IterationDescriptor descr) {
+ final WorksetIterationPlanNode iterNode =
(WorksetIterationPlanNode) descr.getIterationNode();
+ final AbstractJobVertex headVertex = descr.getHeadTask();
+ final TaskConfig headConfig = new
TaskConfig(headVertex.getConfiguration());
+ final TaskConfig headFinalOutputConfig =
descr.getHeadFinalResultConfig();
+
+ // ------------ finalize the head config with the final outputs
and the sync gate ------------
+ {
+ final int numStepFunctionOuts =
headConfig.getNumOutputs();
+ final int numFinalOuts =
headFinalOutputConfig.getNumOutputs();
+
+ if (numStepFunctionOuts == 0) {
+ throw new CompilerException("The workset
iteration has no operation on the workset inside the step function.");
+ }
+
+
headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
+
headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts +
numFinalOuts);
+ final double relativeMemory =
iterNode.getRelativeMemoryPerSubTask();
+ if (relativeMemory <= 0) {
+ throw new CompilerException("Bug: No memory has
been assigned to the workset iteration.");
+ }
+
+ headConfig.setIsWorksetIteration();
+ headConfig.setRelativeBackChannelMemory(relativeMemory
/ 2);
+ headConfig.setRelativeSolutionSetMemory(relativeMemory
/ 2);
+
+ // set the solution set serializer and comparator
+
headConfig.setSolutionSetSerializer(iterNode.getSolutionSetSerializer());
+
headConfig.setSolutionSetComparator(iterNode.getSolutionSetComparator());
+ }
+
+ // --------------------------- create the sync task
---------------------------
+ final TaskConfig syncConfig;
+ {
+ final AbstractJobVertex sync = new
AbstractJobVertex("Sync (" + iterNode.getNodeName() + ")");
+
sync.setInvokableClass(IterationSynchronizationSinkTask.class);
+ sync.setParallelism(1);
+ this.auxVertices.add(sync);
+
+ syncConfig = new TaskConfig(sync.getConfiguration());
+
syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0,
headVertex.getParallelism());
+
+ // set the number of iteration / convergence criterion
for the sync
+ final int maxNumIterations =
iterNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations();
+ if (maxNumIterations < 1) {
+ throw new CompilerException("Cannot create
workset iteration with unspecified maximum number of iterations.");
+ }
+ syncConfig.setNumberOfIterations(maxNumIterations);
+
+ // connect the sync task
+ sync.connectNewDataSetAsInput(headVertex,
DistributionPattern.POINTWISE);
+ }
+
+ // ----------------------------- create the iteration tails
-----------------------------
+ // ----------------------- for next workset and solution set
delta-----------------------
+
+ {
+ // we have three possible cases:
+ // 1) Two tails, one for workset update, one for
solution set update
+ // 2) One tail for workset update, solution set update
happens in an intermediate task
+ // 3) One tail for solution set update, workset update
happens in an intermediate task
+
+ final PlanNode nextWorksetNode =
iterNode.getNextWorkSetPlanNode();
+ final PlanNode solutionDeltaNode =
iterNode.getSolutionSetDeltaPlanNode();
+
+ final boolean hasWorksetTail =
nextWorksetNode.getOutgoingChannels().isEmpty();
+ final boolean hasSolutionSetTail =
(!iterNode.isImmediateSolutionSetUpdate()) || (!hasWorksetTail);
+
+ {
+ // get the vertex for the workset update
+ final TaskConfig worksetTailConfig;
+ AbstractJobVertex nextWorksetVertex =
(AbstractJobVertex) this.vertices.get(nextWorksetNode);
+ if (nextWorksetVertex == null) {
+ // nextWorksetVertex is chained
+ TaskInChain taskInChain =
this.chainedTasks.get(nextWorksetNode);
+ if (taskInChain == null) {
+ throw new
CompilerException("Bug: Next workset node not found as vertex or chained
task.");
+ }
+ nextWorksetVertex = (AbstractJobVertex)
taskInChain.getContainingVertex();
+ worksetTailConfig =
taskInChain.getTaskConfig();
+ } else {
+ worksetTailConfig = new
TaskConfig(nextWorksetVertex.getConfiguration());
+ }
+
+ // mark the node to perform workset updates
+ worksetTailConfig.setIsWorksetIteration();
+ worksetTailConfig.setIsWorksetUpdate();
+
+ if (hasWorksetTail) {
+
nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);
+
+
worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
+ }
+ }
+ {
+ final TaskConfig solutionDeltaConfig;
+ AbstractJobVertex solutionDeltaVertex =
(AbstractJobVertex) this.vertices.get(solutionDeltaNode);
+ if (solutionDeltaVertex == null) {
+ // last op is chained
+ TaskInChain taskInChain =
this.chainedTasks.get(solutionDeltaNode);
+ if (taskInChain == null) {
+ throw new
CompilerException("Bug: Solution Set Delta not found as vertex or chained
task.");
+ }
+ solutionDeltaVertex =
(AbstractJobVertex) taskInChain.getContainingVertex();
+ solutionDeltaConfig =
taskInChain.getTaskConfig();
+ } else {
+ solutionDeltaConfig = new
TaskConfig(solutionDeltaVertex.getConfiguration());
+ }
+
+ solutionDeltaConfig.setIsWorksetIteration();
+ solutionDeltaConfig.setIsSolutionSetUpdate();
+
+ if (hasSolutionSetTail) {
+
solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);
+
+
solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
+
+ // tell the head that it needs to wait
for the solution set updates
+
headConfig.setWaitForSolutionSetUpdate();
+ }
+ else {
+ // no tail, intermediate update. must
be immediate update
+ if
(!iterNode.isImmediateSolutionSetUpdate()) {
+ throw new CompilerException("A
solution set update without dedicated tail is not set to perform immediate
updates.");
+ }
+
solutionDeltaConfig.setIsSolutionSetUpdateWithoutReprobe();
+ }
+ }
+ }
+
+ // ------------------- register the aggregators
-------------------
+ AggregatorRegistry aggs =
iterNode.getIterationNode().getIterationContract().getAggregators();
+ Collection<AggregatorWithName<?>> allAggregators =
aggs.getAllRegisteredAggregators();
+
+ for (AggregatorWithName<?> agg : allAggregators) {
+ if
(agg.getName().equals(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME)) {
+ throw new CompilerException("User defined
aggregator used the same name as built-in workset " +
+ "termination check aggregator:
" + WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME);
+ }
+ }
+
+ headConfig.addIterationAggregators(allAggregators);
+ syncConfig.addIterationAggregators(allAggregators);
+
+ String convAggName =
aggs.getConvergenceCriterionAggregatorName();
+ ConvergenceCriterion<?> convCriterion =
aggs.getConvergenceCriterion();
+
+ if (convCriterion != null || convAggName != null) {
+ throw new CompilerException("Error: Cannot use custom
convergence criterion with workset iteration. Workset iterations have implicit
convergence criterion where workset is empty.");
+ }
+
+
headConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new LongSumAggregator());
+
syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new LongSumAggregator());
+
syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
new WorksetEmptyConvergenceCriterion());
+ }
+
+ private static String getDescriptionForUserCode(UserCodeWrapper<?>
wrapper) {
+ try {
+ if (wrapper.hasObject()) {
+ try {
+ return
wrapper.getUserCodeObject().toString();
+ }
+ catch (Throwable t) {
+ return
wrapper.getUserCodeClass().getName();
+ }
+ }
+ else {
+ return wrapper.getUserCodeClass().getName();
+ }
+ }
+ catch (Throwable t) {
+ return null;
+ }
+ }
+
+ //
-------------------------------------------------------------------------------------
+ // Descriptors for tasks / configurations that are chained or merged
with other tasks
+ //
-------------------------------------------------------------------------------------
+
+ /**
+ * Utility class that describes a task in a sequence of chained tasks.
Chained tasks are tasks that run
+ * together in one thread.
+ */
+ private static final class TaskInChain {
+
+ private final Class<? extends ChainedDriver<?, ?>> chainedTask;
+
+ private final TaskConfig taskConfig;
+
+ private final String taskName;
+
+ private AbstractJobVertex containingVertex;
+
+ TaskInChain(Class<? extends ChainedDriver<?, ?>> chainedTask,
TaskConfig taskConfig,
+ String taskName) {
+ this.chainedTask = chainedTask;
+ this.taskConfig = taskConfig;
+ this.taskName = taskName;
+ }
+
+ public Class<? extends ChainedDriver<?, ?>> getChainedTask() {
+ return this.chainedTask;
+ }
+
+ public TaskConfig getTaskConfig() {
+ return this.taskConfig;
+ }
+
+ public String getTaskName() {
+ return this.taskName;
+ }
+
+ public AbstractJobVertex getContainingVertex() {
+ return this.containingVertex;
+ }
+
+ public void setContainingVertex(AbstractJobVertex
containingVertex) {
+ this.containingVertex = containingVertex;
+ }
+ }
+
+ private static final class IterationDescriptor {
+
+ private final IterationPlanNode iterationNode;
+
+ private AbstractJobVertex headTask;
+
+ private TaskConfig headConfig;
+
+ private TaskConfig headFinalResultConfig;
+
+ private final int id;
+
+ public IterationDescriptor(IterationPlanNode iterationNode, int
id) {
+ this.iterationNode = iterationNode;
+ this.id = id;
+ }
+
+ public IterationPlanNode getIterationNode() {
+ return iterationNode;
+ }
+
+ public void setHeadTask(AbstractJobVertex headTask, TaskConfig
headConfig) {
+ this.headTask = headTask;
+ this.headFinalResultConfig = new TaskConfig(new
Configuration());
+
+ // check if we already had a configuration, for example
if the solution set was
+ if (this.headConfig != null) {
+
headConfig.getConfiguration().addAll(this.headConfig.getConfiguration());
+ }
+
+ this.headConfig = headConfig;
+ }
+
+ public AbstractJobVertex getHeadTask() {
+ return headTask;
+ }
+
+ public TaskConfig getHeadFinalResultConfig() {
+ return headFinalResultConfig;
+ }
+
+ public int getId() {
+ return this.id;
+ }
+ }
+}