[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15024345#comment-15024345
 ] 

ASF GitHub Bot commented on FLINK-2837:
---------------------------------------

Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1398#discussion_r45726538
  
    --- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 ---
    @@ -15,75 +16,468 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -
     package org.apache.flink.storm.api;
     
    +import backtype.storm.generated.ComponentCommon;
    +import backtype.storm.generated.GlobalStreamId;
    +import backtype.storm.generated.Grouping;
     import backtype.storm.generated.StormTopology;
    +import backtype.storm.topology.IRichBolt;
    +import backtype.storm.topology.IRichSpout;
    +import backtype.storm.topology.IRichStateSpout;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.tuple.Fields;
    +import com.google.common.base.Preconditions;
     import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.storm.util.SplitStreamMapper;
    +import org.apache.flink.storm.util.SplitStreamType;
    +import org.apache.flink.storm.util.StormStreamSelector;
    +import org.apache.flink.storm.wrappers.BoltWrapper;
    +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
    +import org.apache.flink.storm.wrappers.SpoutWrapper;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.datastream.SplitStream;
     import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
     
     /**
    - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented 
in terms of a {@link
    - * StreamExecutionEnvironment} . In contrast to a regular {@link 
StreamExecutionEnvironment}, a {@link FlinkTopology}
    - * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
    - * {@link FlinkClient}.
    + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink 
program.
    + * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.</strong>
      */
    -public class FlinkTopology extends StreamExecutionEnvironment {
    +public class FlinkTopology {
    +
    +   /** All declared streams and output schemas by operator ID */
    +   private final HashMap<String, HashMap<String, Fields>> outputStreams = 
new HashMap<String, HashMap<String, Fields>>();
    +   /** All spouts&bolts declarers by their ID */
    +   private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = 
new HashMap<String, FlinkOutputFieldsDeclarer>();
    +
    +   private final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> 
unprocessdInputsPerBolt =
    +                   new HashMap<String, Set<Entry<GlobalStreamId, 
Grouping>>>();
    +
    +   final HashMap<String, HashMap<String, DataStream<Tuple>>> 
availableInputs = new HashMap<>();
     
    -   /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
    -   private int numberOfTasks = 0;
    +   private final TopologyBuilder builder;
     
    -   public FlinkTopology() {
    -           // Set default parallelism to 1, to mirror Storm default 
behavior
    -           super.setParallelism(1);
    +   // needs to be a class member for internal testing purpose
    +   private final StormTopology stormTopology;
    +
    +   private final Map<String, IRichSpout> spouts;
    +   private final Map<String, IRichBolt> bolts;
    +
    +   private final StreamExecutionEnvironment env;
    +
    +   private FlinkTopology(TopologyBuilder builder) {
    +           this.builder = builder;
    +           this.stormTopology = builder.createTopology();
    +           // extract the spouts and bolts
    +           this.spouts = getPrivateField("_spouts");
    +           this.bolts = getPrivateField("_bolts");
    +
    +           this.env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +           // Kick off the translation immediately
    +           translateTopology();
        }
     
        /**
    -    * Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter}, or {@link
    -    * FlinkClient}.
         *
    -    * @throws UnsupportedOperationException
    -    *              at every invocation
    +    * Creates a Flink program that uses the specified spouts and bolts.
    +    * @param stormBuilder The storm topology builder to use for creating 
the Flink topology.
    +    * @return A Flink Topology which may be executed.
         */
    -   @Override
    -   public JobExecutionResult execute() throws Exception {
    -           throw new UnsupportedOperationException(
    -                           "A FlinkTopology cannot be executed directly. 
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
    -                           "instead.");
    +   public static FlinkTopology createTopology(TopologyBuilder 
stormBuilder) {
    +           return new FlinkTopology(stormBuilder);
        }
     
        /**
    -    * Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter} or {@link
    -    * FlinkClient}.
    -    *
    -    * @throws UnsupportedOperationException
    -    *              at every invocation
    +    * Returns the underlying Flink ExecutionEnvironment for the Storm 
topology.
    +    * @return The contextual environment.
         */
    -   @Override
    -   public JobExecutionResult execute(final String jobName) throws 
Exception {
    -           throw new UnsupportedOperationException(
    -                           "A FlinkTopology cannot be executed directly. 
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
    -                           "instead.");
    +   public StreamExecutionEnvironment getExecutionEnvironment() {
    +           return this.env;
        }
     
        /**
    -    * Increased the number of declared tasks of this program by the given 
value.
    -    *
    -    * @param dop
    -    *              The dop of a new operator that increases the number of 
overall tasks.
    +    * Directly executes the Storm topology based on the current context 
(local when in IDE and
    +    * remote when executed thorugh ./bin/flink).
    +    * @return The execution result
    +    * @throws Exception
         */
    -   public void increaseNumberOfTasks(final int dop) {
    -           assert (dop > 0);
    -           this.numberOfTasks += dop;
    +   public JobExecutionResult execute() throws Exception {
    +           return env.execute();
    +   }
    +
    +
    +   @SuppressWarnings("unchecked")
    +   private <T> Map<String, T> getPrivateField(String field) {
    +           try {
    +                   Field f = builder.getClass().getDeclaredField(field);
    +                   f.setAccessible(true);
    +                   return copyObject((Map<String, T>) f.get(builder));
    +           } catch (NoSuchFieldException | IllegalAccessException e) {
    +                   throw new RuntimeException("Couldn't get " + field + " 
from TopologyBuilder", e);
    +           }
    +   }
    +
    +   private <T> T copyObject(T object) {
    +           try {
    +                   return InstantiationUtil.deserializeObject(
    +                                   
InstantiationUtil.serializeObject(object),
    +                                   getClass().getClassLoader()
    +                   );
    +           } catch (IOException | ClassNotFoundException e) {
    +                   throw new RuntimeException("Failed to copy object.");
    +           }
        }
     
        /**
    -    * Return the number or required tasks to execute this program.
    -    *
    -    * @return the number or required tasks to execute this program
    +    * Creates a Flink program that uses the specified spouts and bolts.
         */
    -   public int getNumberOfTasks() {
    -           return this.numberOfTasks;
    +   private void translateTopology() {
    +
    +           unprocessdInputsPerBolt.clear();
    +           outputStreams.clear();
    +           declarers.clear();
    +           availableInputs.clear();
    +
    +           // Storm defaults to parallelism 1
    +           env.setParallelism(1);
    +
    +           /* Translation of topology */
    +
    +
    +           for (final Entry<String, IRichSpout> spout : spouts.entrySet()) 
{
    +                   final String spoutId = spout.getKey();
    +                   final IRichSpout userSpout = spout.getValue();
    +
    +                   final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
    +                   userSpout.declareOutputFields(declarer);
    +                   final HashMap<String,Fields> sourceStreams = 
declarer.outputStreams;
    +                   this.outputStreams.put(spoutId, sourceStreams);
    +                   declarers.put(spoutId, declarer);
    +
    +
    +                   final HashMap<String, DataStream<Tuple>> outputStreams 
= new HashMap<String, DataStream<Tuple>>();
    +                   final DataStreamSource<?> source;
    +
    +                   if (sourceStreams.size() == 1) {
    +                           final SpoutWrapper<Tuple> 
spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout);
    +                           
spoutWrapperSingleOutput.setStormTopology(stormTopology);
    +
    +                           final String outputStreamId = (String) 
sourceStreams.keySet().toArray()[0];
    +
    +                           DataStreamSource<Tuple> src = 
env.addSource(spoutWrapperSingleOutput, spoutId,
    +                                           
declarer.getOutputType(outputStreamId));
    +
    +                           outputStreams.put(outputStreamId, src);
    +                           source = src;
    +                   } else {
    +                           final SpoutWrapper<SplitStreamType<Tuple>> 
spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
    +                                           userSpout);
    +                           
spoutWrapperMultipleOutputs.setStormTopology(stormTopology);
    +
    +                           @SuppressWarnings({ "unchecked", "rawtypes" })
    +                           DataStreamSource<SplitStreamType<Tuple>> 
multiSource = env.addSource(
    +                                           spoutWrapperMultipleOutputs, 
spoutId,
    +                                           (TypeInformation) 
TypeExtractor.getForClass(SplitStreamType.class));
    +
    +                           SplitStream<SplitStreamType<Tuple>> splitSource 
= multiSource
    +                                           .split(new 
StormStreamSelector<Tuple>());
    +                           for (String streamId : sourceStreams.keySet()) {
    +                                   outputStreams.put(streamId, 
splitSource.select(streamId).map(new SplitStreamMapper<Tuple>()));
    +                           }
    +                           source = multiSource;
    +                   }
    +                   availableInputs.put(spoutId, outputStreams);
    +
    +                   final ComponentCommon common = 
stormTopology.get_spouts().get(spoutId).get_common();
    +                   if (common.is_set_parallelism_hint()) {
    +                           int dop = common.get_parallelism_hint();
    +                           source.setParallelism(dop);
    +                   } else {
    +                           common.set_parallelism_hint(1);
    +                   }
    +           }
    +
    +           /**
    +           * 1. Connect all spout streams with bolts streams
    +           * 2. Then proceed with the bolts stream already connected
    +           *
    +           *  Because we do not know the order in which an iterator steps 
over a set, we might process a consumer before
    +           * its producer
    +           * ->thus, we might need to repeat multiple times
    +           */
    +           boolean makeProgress = true;
    +           while (bolts.size() > 0) {
    +                   if (!makeProgress) {
    +                           throw new RuntimeException(
    +                                           "Unable to build Topology. 
Could not connect the following bolts: "
    +                                                           + 
bolts.keySet());
    +                   }
    +                   makeProgress = false;
    +
    +                   final Iterator<Entry<String, IRichBolt>> boltsIterator 
= bolts.entrySet().iterator();
    +                   while (boltsIterator.hasNext()) {
    +
    +                           final Entry<String, IRichBolt> bolt = 
boltsIterator.next();
    +                           final String boltId = bolt.getKey();
    +                           final IRichBolt userBolt = 
copyObject(bolt.getValue());
    +
    +                           final ComponentCommon common = 
stormTopology.get_bolts().get(boltId).get_common();
    +
    +                           Set<Entry<GlobalStreamId, Grouping>> 
unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
    +                           if (unprocessedBoltInputs == null) {
    +                                   unprocessedBoltInputs = new HashSet<>();
    +                                   
unprocessedBoltInputs.addAll(common.get_inputs().entrySet());
    +                                   unprocessdInputsPerBolt.put(boltId, 
unprocessedBoltInputs);
    +                           }
    +
    +                           // check if all inputs are available
    +                           final int numberOfInputs = 
unprocessedBoltInputs.size();
    +                           int inputsAvailable = 0;
    +                           for (Entry<GlobalStreamId, Grouping> entry : 
unprocessedBoltInputs) {
    +                                   final String producerId = 
entry.getKey().get_componentId();
    +                                   final String streamId = 
entry.getKey().get_streamId();
    +                                   final HashMap<String, 
DataStream<Tuple>> streams = availableInputs.get(producerId);
    +                                   if (streams != null && 
streams.get(streamId) != null) {
    +                                           inputsAvailable++;
    +                                   }
    +                           }
    +
    +                           if (inputsAvailable != numberOfInputs) {
    +                                   // traverse other bolts first until 
inputs are available
    +                                   continue;
    +                           } else {
    +                                   makeProgress = true;
    +                                   boltsIterator.remove();
    +                           }
    +
    +                           final Map<GlobalStreamId, DataStream<Tuple>> 
inputStreams = new HashMap<>(numberOfInputs);
    +
    +                           for (Entry<GlobalStreamId, Grouping> input : 
unprocessedBoltInputs) {
    +                                   final GlobalStreamId streamId = 
input.getKey();
    +                                   final Grouping grouping = 
input.getValue();
    +
    +                                   final String producerId = 
streamId.get_componentId();
    +
    +                                   final Map<String, DataStream<Tuple>> 
producer = availableInputs.get(producerId);
    +
    +                                   inputStreams.put(streamId, 
processInput(boltId, userBolt, streamId, grouping, producer));
    +                           }
    +
    +                           final Iterator<Entry<GlobalStreamId, 
DataStream<Tuple>>> iterator = inputStreams.entrySet().iterator();
    +
    +                           final Entry<GlobalStreamId, DataStream<Tuple>> 
firstInput = iterator.next();
    +                           GlobalStreamId streamId = firstInput.getKey();
    +                           DataStream<Tuple> inputStream = 
firstInput.getValue();
    +
    +                           final SingleOutputStreamOperator<?, ?> 
outputStream;
    +
    +                           switch (numberOfInputs) {
    +                                   case 1:
    +                                           outputStream = 
createOutput(boltId, userBolt, streamId, inputStream);
    +                                           break;
    +                                   case 2:
    +                                           Entry<GlobalStreamId, 
DataStream<Tuple>> secondInput = iterator.next();
    +                                           GlobalStreamId streamId2 = 
secondInput.getKey();
    +                                           DataStream<Tuple> inputStream2 
= secondInput.getValue();
    +                                           outputStream = 
createOutput(boltId, userBolt, streamId, inputStream, streamId2, inputStream2);
    +                                           break;
    +                                   default:
    +                                           throw new 
UnsupportedOperationException("Don't know how to translate a bolt "
    +                                                           + boltId + " 
with " + numberOfInputs + " inputs.");
    +                           }
    +
    +                           if (common.is_set_parallelism_hint()) {
    +                                   int dop = common.get_parallelism_hint();
    +                                   outputStream.setParallelism(dop);
    +                           } else {
    +                                   common.set_parallelism_hint(1);
    +                           }
    +
    +                   }
    +           }
        }
     
    +   private DataStream<Tuple> processInput(String boltId, IRichBolt 
userBolt,
    +                                                                           
GlobalStreamId streamId, Grouping grouping,
    +                                                                           
Map<String, DataStream<Tuple>> producer) {
    +
    +           Preconditions.checkNotNull(userBolt);
    --- End diff --
    
    I would use `assert` here because it a private method.


> FlinkTopologyBuilder cannot handle multiple input streams
> ---------------------------------------------------------
>
>                 Key: FLINK-2837
>                 URL: https://issues.apache.org/jira/browse/FLINK-2837
>             Project: Flink
>          Issue Type: Bug
>          Components: Storm Compatibility
>            Reporter: Matthias J. Sax
>            Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>       .shuffleGrouping(spoutId1)
>       .shuffleGrouping(spoutId2)
>       .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>       .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to