Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/72#discussion_r15576637
  
    --- Diff: 
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
 ---
    @@ -0,0 +1,825 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + *
    + */
    +
    +package org.apache.flink.streaming.api;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.commons.lang3.SerializationUtils;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.io.network.channels.ChannelType;
    +import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
    +import org.apache.flink.runtime.jobgraph.DistributionPattern;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
    +import org.apache.flink.runtime.jobgraph.JobInputVertex;
    +import org.apache.flink.runtime.jobgraph.JobOutputVertex;
    +import org.apache.flink.runtime.jobgraph.JobTaskVertex;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.streaming.api.collector.OutputSelector;
    +import org.apache.flink.streaming.api.invokable.SinkInvokable;
    +import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
    +import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
    +import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
    +import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
    +import org.apache.flink.streaming.api.streamcomponent.CoStreamTask;
    +import org.apache.flink.streaming.api.streamcomponent.StreamIterationSink;
    +import 
org.apache.flink.streaming.api.streamcomponent.StreamIterationSource;
    +import org.apache.flink.streaming.api.streamcomponent.StreamSink;
    +import org.apache.flink.streaming.api.streamcomponent.StreamSource;
    +import org.apache.flink.streaming.api.streamcomponent.StreamTask;
    +import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
    +import org.apache.flink.streaming.partitioner.DistributePartitioner;
    +import org.apache.flink.streaming.partitioner.FieldsPartitioner;
    +import org.apache.flink.streaming.partitioner.ForwardPartitioner;
    +import org.apache.flink.streaming.partitioner.GlobalPartitioner;
    +import org.apache.flink.streaming.partitioner.ShufflePartitioner;
    +import org.apache.flink.streaming.partitioner.StreamPartitioner;
    +
    +/**
    + * Object for building Flink stream processing job graphs
    + */
    +public class JobGraphBuilder {
    +
    +   private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
    +   private final JobGraph jobGraph;
    +
    +   // Graph attributes
    +   private Map<String, AbstractJobVertex> components;
    +   private Map<String, Integer> componentParallelism;
    +   private Map<String, ArrayList<String>> outEdgeList;
    +   private Map<String, ArrayList<Integer>> outEdgeType;
    +   private Map<String, Boolean> mutability;
    +   private Map<String, List<String>> inEdgeList;
    +   private Map<String, List<StreamPartitioner<? extends Tuple>>> 
connectionTypes;
    +   private Map<String, String> userDefinedNames;
    +   private Map<String, String> operatorNames;
    +   private Map<String, StreamComponentInvokable> invokableObjects;
    +   private Map<String, byte[]> serializedFunctions;
    +   private Map<String, byte[]> outputSelectors;
    +   private Map<String, Class<? extends AbstractInvokable>> 
componentClasses;
    +   private Map<String, String> iterationIds;
    +   private Map<String, String> iterationHeadNames;
    +   private Map<String, Integer> iterationTailCount;
    +
    +   private String maxParallelismVertexName;
    +   private int maxParallelism;
    +
    +   /**
    +    * Creates an new {@link JobGraph} with the given name. A JobGraph is a 
DAG
    +    * and consists of sources, tasks (intermediate vertices) and sinks. A
    +    * JobGraph must contain at least a source and a sink.
    +    * 
    +    * @param jobGraphName
    +    *            Name of the JobGraph
    +    */
    +   public JobGraphBuilder(String jobGraphName) {
    +
    +           jobGraph = new JobGraph(jobGraphName);
    +
    +           components = new HashMap<String, AbstractJobVertex>();
    +           componentParallelism = new HashMap<String, Integer>();
    +           outEdgeList = new HashMap<String, ArrayList<String>>();
    +           outEdgeType = new HashMap<String, ArrayList<Integer>>();
    +           mutability = new HashMap<String, Boolean>();
    +           inEdgeList = new HashMap<String, List<String>>();
    +           connectionTypes = new HashMap<String, List<StreamPartitioner<? 
extends Tuple>>>();
    +           userDefinedNames = new HashMap<String, String>();
    +           operatorNames = new HashMap<String, String>();
    +           invokableObjects = new HashMap<String, 
StreamComponentInvokable>();
    +           serializedFunctions = new HashMap<String, byte[]>();
    +           outputSelectors = new HashMap<String, byte[]>();
    +           componentClasses = new HashMap<String, Class<? extends 
AbstractInvokable>>();
    +           iterationIds = new HashMap<String, String>();
    +           iterationHeadNames = new HashMap<String, String>();
    +           iterationTailCount = new HashMap<String, Integer>();
    +
    +           maxParallelismVertexName = "";
    +           maxParallelism = 0;
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("JobGraph created");
    +           }
    +   }
    +
    +   /**
    +    * Adds source to the JobGraph with the given parameters
    +    * 
    +    * @param componentName
    +    *            Name of the component
    +    * @param InvokableObject
    +    *            User defined operator
    +    * @param operatorName
    +    *            Operator type
    +    * @param serializedFunction
    +    *            Serialized udf
    +    * @param parallelism
    +    *            Number of parallel instances created
    +    */
    +   public void addSource(String componentName,
    +                   UserSourceInvokable<? extends Tuple> InvokableObject, 
String operatorName,
    +                   byte[] serializedFunction, int parallelism) {
    +
    +           addComponent(componentName, StreamSource.class, 
InvokableObject, operatorName,
    +                           serializedFunction, parallelism);
    +
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("SOURCE: " + componentName);
    +           }
    +   }
    +
    +   /**
    +    * Adds a source to the iteration head to the {@link JobGraph}. The 
iterated
    +    * tuples will be fed from this component back to the graph.
    +    * 
    +    * @param componentName
    +    *            Name of the component
    +    * @param iterationHead
    +    *            Id of the iteration head
    +    * @param iterationID
    +    *            ID of iteration for multiple iterations
    +    * @param parallelism
    +    *            Number of parallel instances created
    +    */
    +   public void addIterationSource(String componentName, String 
iterationHead, String iterationID,
    +                   int parallelism) {
    +
    +           addComponent(componentName, StreamIterationSource.class, null, 
null, null, parallelism);
    +           iterationIds.put(componentName, iterationID);
    +           iterationHeadNames.put(iterationID, componentName);
    +
    +           setBytesFrom(iterationHead, componentName);
    +
    +           setEdge(componentName, iterationHead,
    +                           
connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0), 0);
    +
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("ITERATION SOURCE: " + componentName);
    +           }
    +   }
    +
    +   /**
    +    * Adds a task to the JobGraph with the given parameters
    +    * 
    +    * @param componentName
    +    *            Name of the component
    +    * @param taskInvokableObject
    +    *            User defined operator
    +    * @param operatorName
    +    *            Operator type
    +    * @param serializedFunction
    +    *            Serialized udf
    +    * @param parallelism
    +    *            Number of parallel instances created
    +    */
    +   public <IN extends Tuple, OUT extends Tuple> void addTask(String 
componentName,
    +                   UserTaskInvokable<IN, OUT> taskInvokableObject, String 
operatorName,
    +                   byte[] serializedFunction, int parallelism) {
    +
    +           addComponent(componentName, StreamTask.class, 
taskInvokableObject, operatorName,
    +                           serializedFunction, parallelism);
    +
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("TASK: " + componentName);
    +           }
    +   }
    +
    +   public <IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> void 
addCoTask(
    +                   String componentName, CoInvokable<IN1, IN2, OUT> 
taskInvokableObject,
    +                   String operatorName, byte[] serializedFunction, int 
parallelism) {
    +
    +           addComponent(componentName, CoStreamTask.class, 
taskInvokableObject, operatorName,
    +                           serializedFunction, parallelism);
    +
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("CO-TASK: " + componentName);
    +           }
    +   }
    +
    +   /**
    +    * Adds sink to the JobGraph with the given parameters
    +    * 
    +    * @param componentName
    +    *            Name of the component
    +    * @param InvokableObject
    +    *            User defined operator
    +    * @param operatorName
    +    *            Operator type
    +    * @param serializedFunction
    +    *            Serialized udf
    +    * @param parallelism
    +    *            Number of parallel instances created
    +    */
    +   public void addSink(String componentName, SinkInvokable<? extends 
Tuple> InvokableObject,
    +                   String operatorName, byte[] serializedFunction, int 
parallelism) {
    +
    +           addComponent(componentName, StreamSink.class, InvokableObject, 
operatorName,
    +                           serializedFunction, parallelism);
    +
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("SINK: " + componentName);
    +           }
    +
    +   }
    +
    +   /**
    +    * Adds a sink to an iteration tail to the {@link JobGraph}. The tuples
    +    * intended to be iterated will be sent to this sink from the iteration
    +    * head.
    +    * 
    +    * @param componentName
    +    *            Name of the component
    +    * @param iterationTail
    +    *            Id of the iteration tail
    +    * @param iterationID
    +    *            ID of iteration for mulitple iterations
    +    * @param parallelism
    +    *            Number of parallel instances created
    +    * @param directName
    +    *            Id of the output direction
    +    */
    +   public void addIterationSink(String componentName, String 
iterationTail, String iterationID,
    +                   int parallelism, String directName) {
    +
    +           addComponent(componentName, StreamIterationSink.class, null, 
null, null, parallelism);
    +           iterationIds.put(componentName, iterationID);
    +           setBytesFrom(iterationTail, componentName);
    +
    +           if (directName != null) {
    +                   setUserDefinedName(componentName, directName);
    +           } else {
    +                   setUserDefinedName(componentName, "iterate");
    +           }
    +
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("ITERATION SINK: " + componentName);
    +           }
    +
    +   }
    +
    +   /**
    +    * Sets component parameters in the JobGraph
    +    * 
    +    * @param componentName
    +    *            Name of the component
    +    * @param componentClass
    +    *            The class of the vertex
    +    * @param invokableObject
    +    *            The user defined invokable object
    +    * @param operatorName
    +    *            Type of the user defined operator
    +    * @param serializedFunction
    +    *            Serialized operator
    +    * @param parallelism
    +    *            Number of parallel instances created
    +    */
    +   private void addComponent(String componentName,
    +                   Class<? extends AbstractInvokable> componentClass,
    +                   StreamComponentInvokable invokableObject, String 
operatorName,
    +                   byte[] serializedFunction, int parallelism) {
    +
    +           componentClasses.put(componentName, componentClass);
    +           setParallelism(componentName, parallelism);
    +           mutability.put(componentName, false);
    +           invokableObjects.put(componentName, invokableObject);
    +           operatorNames.put(componentName, operatorName);
    +           serializedFunctions.put(componentName, serializedFunction);
    +           outEdgeList.put(componentName, new ArrayList<String>());
    +           outEdgeType.put(componentName, new ArrayList<Integer>());
    +           inEdgeList.put(componentName, new ArrayList<String>());
    +           connectionTypes.put(componentName, new 
ArrayList<StreamPartitioner<? extends Tuple>>());
    +           iterationTailCount.put(componentName, 0);
    +   }
    +
    +   /**
    +    * Creates an {@link AbstractJobVertex} in the {@link JobGraph} and 
sets its
    +    * config parameters using the ones set previously.
    +    * 
    +    * @param componentName
    +    *            Name of the component for which the vertex will be 
created.
    +    */
    +   private void createVertex(String componentName) {
    +
    +           // Get vertex attributes
    +           Class<? extends AbstractInvokable> componentClass = 
componentClasses.get(componentName);
    +           StreamComponentInvokable invokableObject = 
invokableObjects.get(componentName);
    +           String operatorName = operatorNames.get(componentName);
    +           byte[] serializedFunction = 
serializedFunctions.get(componentName);
    +           int parallelism = componentParallelism.get(componentName);
    +           byte[] outputSelector = outputSelectors.get(componentName);
    +           String userDefinedName = userDefinedNames.get(componentName);
    +
    +           // Create vertex object
    +           AbstractJobVertex component = null;
    +           if (componentClass.equals(StreamSource.class)
    +                           || 
componentClass.equals(StreamIterationSource.class)) {
    +                   component = new JobInputVertex(componentName, 
this.jobGraph);
    +           } else if (componentClass.equals(StreamTask.class)
    +                           || componentClass.equals(CoStreamTask.class)) {
    +                   component = new JobTaskVertex(componentName, 
this.jobGraph);
    +           } else if (componentClass.equals(StreamSink.class)
    +                           || 
componentClass.equals(StreamIterationSink.class)) {
    +                   component = new JobOutputVertex(componentName, 
this.jobGraph);
    +           }
    +
    +           component.setInvokableClass(componentClass);
    +           component.setNumberOfSubtasks(parallelism);
    +           if (LOG.isDebugEnabled()) {
    +                   LOG.debug("Parallelism set: " + parallelism + " for " + 
componentName);
    +           }
    +
    +           Configuration config = component.getConfiguration();
    +
    +           config.setBoolean("isMutable", mutability.get(componentName));
    +
    +           // Set vertex config
    +           if (invokableObject != null) {
    +                   config.setClass("userfunction", 
invokableObject.getClass());
    +                   addSerializedObject(invokableObject, config);
    +           }
    +           config.setString("componentName", componentName);
    +           if (serializedFunction != null) {
    +                   config.setBytes("operator", serializedFunction);
    +                   config.setString("operatorName", operatorName);
    +           }
    +
    +           if (userDefinedName != null) {
    +                   config.setString("userDefinedName", userDefinedName);
    +           }
    +
    +           if (outputSelector != null) {
    +                   config.setBoolean("directedEmit", true);
    +                   config.setBytes("outputSelector", outputSelector);
    +           }
    +
    +           if (componentClass.equals(StreamIterationSource.class)
    +                           || 
componentClass.equals(StreamIterationSink.class)) {
    +                   config.setString("iteration-id", 
iterationIds.get(componentName));
    +           }
    +
    +           components.put(componentName, component);
    +
    +           if (parallelism > maxParallelism) {
    +                   maxParallelism = parallelism;
    +                   maxParallelismVertexName = componentName;
    +           }
    +   }
    +
    +   /**
    +    * Adds serialized invokable object to the JobVertex configuration
    +    * 
    +    * @param invokableObject
    +    *            Invokable object to serialize
    +    * @param config
    +    *            JobVertex configuration to which the serialized invokable 
will
    +    *            be added
    +    */
    +   private void addSerializedObject(Serializable invokableObject, 
Configuration config) {
    +
    +           ByteArrayOutputStream baos = null;
    +           ObjectOutputStream oos = null;
    +           try {
    +                   baos = new ByteArrayOutputStream();
    +
    +                   oos = new ObjectOutputStream(baos);
    +
    +                   oos.writeObject(invokableObject);
    +
    +                   config.setBytes("serializedudf", baos.toByteArray());
    --- End diff --
    
    @gaborhermann extracted these statements to a StreamConfig. It is under 
review as of now. Thanks for the suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to