http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java new file mode 100644 index 0000000..85d895c --- /dev/null +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.wrappers; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.storm.util.SplitStreamType; + +import java.util.HashMap; +import java.util.List; +import java.util.Map.Entry; + +/** + * A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples. + */ +abstract class AbstractStormCollector<OUT> { + + /** Flink output tuple of concrete type {@link Tuple0} to {@link Tuple25} per output stream. */ + protected final HashMap<String, Tuple> outputTuple = new HashMap<String, Tuple>(); + /** Flink split tuple. Used, if multiple output streams are declared. */ + private final SplitStreamType<Object> splitTuple = new SplitStreamType<Object>(); + /** + * The number of attributes of the output tuples per stream. (Determines the concrete type of {@link #outputTuple}). + * If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not used and "raw" data type is used. + */ + protected final HashMap<String, Integer> numberOfAttributes; + /** Indicates of multiple output stream are declared and thus {@link SplitStreamType} must be used as output. */ + private final boolean split; + /** Is set to {@code true} each time a tuple is emitted. */ + boolean tupleEmitted = false; + + /** + * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via {@link #doEmit(Object)}. If the + * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is + * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. + * + * @param numberOfAttributes + * The number of attributes of the emitted tuples per output stream. + * @throws UnsupportedOperationException + * if the specified number of attributes is greater than 25 + */ + AbstractStormCollector(final HashMap<String, Integer> numberOfAttributes) + throws UnsupportedOperationException { + assert (numberOfAttributes != null); + + this.numberOfAttributes = numberOfAttributes; + this.split = this.numberOfAttributes.size() > 1; + + for (Entry<String, Integer> outputStream : numberOfAttributes.entrySet()) { + final int numAtt = outputStream.getValue(); + assert (numAtt >= -1); + + if (numAtt > 25) { + throw new UnsupportedOperationException( + "Flink cannot handle more then 25 attributes, but " + numAtt + + " are declared for stream '" + outputStream.getKey() + + "' by the given bolt"); + } else if (numAtt >= 0) { + try { + this.outputTuple.put(outputStream.getKey(), + org.apache.flink.api.java.tuple.Tuple.getTupleClass(numAtt) + .newInstance()); + } catch (final InstantiationException e) { + throw new RuntimeException(e); + } catch (final IllegalAccessException e) { + throw new RuntimeException(e); + } + + } + } + } + + /** + * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)} + * to the specified output stream. + * + * @param The + * The output stream id. + * @param tuple + * The Storm tuple to be emitted. + * @return the return value of {@link #doEmit(Object)} + */ + @SuppressWarnings("unchecked") + protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) { + List<Integer> taskIds; + + final int numAtt = this.numberOfAttributes.get(streamId); + if (numAtt > -1) { + assert (tuple.size() == numAtt); + Tuple out = this.outputTuple.get(streamId); + for (int i = 0; i < numAtt; ++i) { + out.setField(tuple.get(i), i); + } + if (this.split) { + this.splitTuple.streamId = streamId; + this.splitTuple.value = out; + + taskIds = doEmit((OUT) this.splitTuple); + } else { + taskIds = doEmit((OUT) out); + } + + } else { + assert (tuple.size() == 1); + if (split) { + this.splitTuple.streamId = streamId; + this.splitTuple.value = tuple.get(0); + + taskIds = doEmit((OUT) this.splitTuple); + } else { + taskIds = doEmit((OUT) tuple.get(0)); + } + } + this.tupleEmitted = true; + + return taskIds; + } + + /** + * Emits a Flink tuple. + * + * @param flinkTuple + * The tuple to be emitted. + * @return the IDs of the tasks this tuple was sent to + */ + protected abstract List<Integer> doEmit(OUT flinkTuple); + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java new file mode 100644 index 0000000..58fd098 --- /dev/null +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import backtype.storm.task.IOutputCollector; +import backtype.storm.tuple.Tuple; + +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.util.Collector; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +/** + * A {@link BoltCollector} is used by {@link BoltWrapper} to provided an Storm compatible + * output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples + * and emits them via the provide {@link Output} object. + */ +class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector { + + /** The Flink output Collector */ + private final Collector<OUT> flinkOutput; + + /** + * Instantiates a new {@link BoltCollector} that emits Flink tuples to the given Flink output object. If the + * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is + * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. + * + * @param numberOfAttributes + * The number of attributes of the emitted tuples per output stream. + * @param flinkOutput + * The Flink output object to be used. + * @throws UnsupportedOperationException + * if the specified number of attributes is greater than 25 + */ + BoltCollector(final HashMap<String, Integer> numberOfAttributes, + final Collector<OUT> flinkOutput) throws UnsupportedOperationException { + super(numberOfAttributes); + assert (flinkOutput != null); + this.flinkOutput = flinkOutput; + } + + @Override + protected List<Integer> doEmit(final OUT flinkTuple) { + this.flinkOutput.collect(flinkTuple); + // TODO + return null; + } + + @Override + public void reportError(final Throwable error) { + // not sure, if Flink can support this + } + + @Override + public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) { + return this.tansformAndEmit(streamId, tuple); + } + + @Override + public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) { + throw new UnsupportedOperationException("Direct emit is not supported by Flink"); + } + + @Override + public void ack(final Tuple input) {} + + @Override + public void fail(final Tuple input) {} + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java new file mode 100644 index 0000000..b16fc09 --- /dev/null +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.wrappers; + +import java.util.Collection; +import java.util.HashMap; + +import backtype.storm.generated.StormTopology; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.tuple.Fields; + +import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.storm.util.SplitStreamType; +import org.apache.flink.storm.util.StormConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import com.google.common.collect.Sets; + +/** + * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming + * program. It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the + * bolt can process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type + * {@code OUT} (see {@link AbstractStormCollector} for supported types).<br /> + * <br /> + * <strong>CAUTION: currently, only simple bolts are supported! (ie, bolts that do not use the Storm configuration + * <code>Map</code> or <code>TopologyContext</code> that is provided by the bolt's <code>open(..)</code> method. + * Furthermore, acking and failing of tuples as well as accessing tuple attributes by field names is not supported so + * far.</strong> + */ +public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> { + private static final long serialVersionUID = -4788589118464155835L; + + /** The wrapped Storm {@link IRichBolt bolt}. */ + private final IRichBolt bolt; + /** Number of attributes of the bolt's output tuples per stream. */ + private final HashMap<String, Integer> numberOfAttributes; + /** The schema (ie, ordered field names) of the input stream. */ + private final Fields inputSchema; + /** The original Storm topology. */ + protected StormTopology stormTopology; + /** + * We have to use this because Operators must output + * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}. + */ + private TimestampedCollector<OUT> flinkCollector; + + /** + * Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be + * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible + * for POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's + * declared number of attributes. + * + * @param bolt + * The Storm {@link IRichBolt bolt} to be used. + * @throws IllegalArgumentException + * If the number of declared output attributes is not with range [0;25]. + */ + public BoltWrapper(final IRichBolt bolt) throws IllegalArgumentException { + this(bolt, null, (Collection<String>) null); + } + + /** + * Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be + * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types + * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on + * the bolt's declared number of attributes. + * + * @param bolt + * The Storm {@link IRichBolt bolt} to be used. + * @param inputSchema + * The schema (ie, ordered field names) of the input stream. + * @throws IllegalArgumentException + * If the number of declared output attributes is not with range [0;25]. + */ + public BoltWrapper(final IRichBolt bolt, final Fields inputSchema) + throws IllegalArgumentException { + this(bolt, inputSchema, (Collection<String>) null); + } + + /** + * Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be + * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible + * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the + * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one + * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. + * + * @param bolt + * The Storm {@link IRichBolt bolt} to be used. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [1;25]. + */ + public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) + throws IllegalArgumentException { + this(bolt, null, Sets.newHashSet(rawOutputs)); + } + + /** + * Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be + * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible + * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the + * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one + * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. + * + * @param bolt + * The Storm {@link IRichBolt bolt} to be used. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [1;25]. + */ + public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs) + throws IllegalArgumentException { + this(bolt, null, rawOutputs); + } + + /** + * Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be + * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types + * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} + * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will + * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. + * + * @param bolt + * The Storm {@link IRichBolt bolt} to be used. + * @param inputSchema + * The schema (ie, ordered field names) of the input stream. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [0;25]. + */ + public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, + final String[] rawOutputs) throws IllegalArgumentException { + this(bolt, inputSchema, Sets.newHashSet(rawOutputs)); + } + + /** + * Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be + * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types + * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} + * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will + * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. + * + * @param bolt + * The Storm {@link IRichBolt bolt} to be used. + * @param inputSchema + * The schema (ie, ordered field names) of the input stream. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [0;25]. + */ + public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, + final Collection<String> rawOutputs) throws IllegalArgumentException { + this.bolt = bolt; + this.inputSchema = inputSchema; + this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs); + } + + /** + * Sets the original Storm topology. + * + * @param stormTopology + * The original Storm topology. + */ + public void setStormTopology(StormTopology stormTopology) { + this.stormTopology = stormTopology; + } + + @Override + public void open(final Configuration parameters) throws Exception { + super.open(parameters); + + this.flinkCollector = new TimestampedCollector<OUT>(output); + OutputCollector stormCollector = null; + + if (this.numberOfAttributes.size() > 0) { + stormCollector = new OutputCollector(new BoltCollector<OUT>( + this.numberOfAttributes, flinkCollector)); + } + + GlobalJobParameters config = super.executionConfig.getGlobalJobParameters(); + StormConfig stormConfig = new StormConfig(); + + if (config != null) { + if (config instanceof StormConfig) { + stormConfig = (StormConfig) config; + } else { + stormConfig.putAll(config.toMap()); + } + } + + final TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext( + super.runtimeContext, this.bolt, this.stormTopology, stormConfig); + + this.bolt.prepare(stormConfig, topologyContext, stormCollector); + } + + @Override + public void dispose() { + this.bolt.cleanup(); + } + + @SuppressWarnings("unchecked") + @Override + public void processElement(final StreamRecord<IN> element) throws Exception { + this.flinkCollector.setTimestamp(element.getTimestamp()); + IN value = element.getValue(); + if (value instanceof SplitStreamType) { + this.bolt.execute(new StormTuple<IN>(((SplitStreamType<IN>) value).value, + inputSchema)); + } else { + this.bolt.execute(new StormTuple<IN>(value, inputSchema)); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + this.output.emitWatermark(mark); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java new file mode 100644 index 0000000..68368bf --- /dev/null +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import backtype.storm.generated.StormTopology; +import backtype.storm.hooks.ITaskHook; +import backtype.storm.metric.api.CombinedMetric; +import backtype.storm.metric.api.ICombiner; +import backtype.storm.metric.api.IMetric; +import backtype.storm.metric.api.IReducer; +import backtype.storm.metric.api.ReducedMetric; +import backtype.storm.state.ISubscribedState; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Fields; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import clojure.lang.Atom; + +/** + * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when + * a Storm topology is executed within Flink. + */ +final class FlinkTopologyContext extends TopologyContext { + + /** + * Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated + * for each parallel task + */ + FlinkTopologyContext(final StormTopology topology, + @SuppressWarnings("rawtypes") final Map stormConf, + final Map<Integer, String> taskToComponent, final Map<String, List<Integer>> componentToSortedTasks, + final Map<String, Map<String, Fields>> componentToStreamToFields, final String stormId, final String codeDir, + final String pidDir, final Integer taskId, final Integer workerPort, final List<Integer> workerTasks, + final Map<String, Object> defaultResources, final Map<String, Object> userResources, + final Map<String, Object> executorData, @SuppressWarnings("rawtypes") final Map registeredMetrics, + final Atom openOrPrepareWasCalled) { + super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, + codeDir, pidDir, taskId, workerPort, workerTasks, defaultResources, userResources, executorData, + registeredMetrics, openOrPrepareWasCalled); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public void addTaskHook(final ITaskHook hook) { + throw new UnsupportedOperationException("Task hooks are not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public Collection<ITaskHook> getHooks() { + throw new UnsupportedOperationException("Task hooks are not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public IMetric getRegisteredMetricByName(final String name) { + throw new UnsupportedOperationException("Metrics are not supported by Flink"); + + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @SuppressWarnings("rawtypes") + @Override + public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) { + throw new UnsupportedOperationException("Metrics are not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @SuppressWarnings("rawtypes") + @Override + public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) { + throw new UnsupportedOperationException("Metrics are not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @SuppressWarnings("unchecked") + @Override + public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) { + throw new UnsupportedOperationException("Metrics are not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public <T extends ISubscribedState> T setAllSubscribedState(final T obj) { + throw new UnsupportedOperationException("Not supported by Flink"); + + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) { + throw new UnsupportedOperationException("Not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T + obj) { + throw new UnsupportedOperationException("Not supported by Flink"); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java new file mode 100644 index 0000000..507305b --- /dev/null +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import java.util.HashMap; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; + +/** + * {@link SetupOutputFieldsDeclarer} is used by {@link WrapperSetupHelper} to determine the output streams and + * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)} + * method. + */ +class SetupOutputFieldsDeclarer implements OutputFieldsDeclarer { + + /** The declared output streams and schemas. */ + HashMap<String, Fields> outputStreams = new HashMap<String, Fields>(); + /** The number of attributes for each declared stream by the wrapped operator. */ + HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>(); + + @Override + public void declare(final Fields fields) { + this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields); + } + + @Override + public void declare(final boolean direct, final Fields fields) { + this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields); + } + + @Override + public void declareStream(final String streamId, final Fields fields) { + this.declareStream(streamId, false, fields); + } + + @Override + public void declareStream(final String streamId, final boolean direct, final Fields fields) { + if (streamId == null) { + throw new IllegalArgumentException("Stream ID cannot be null."); + } + if (direct) { + throw new UnsupportedOperationException("Direct emit is not supported by Flink"); + } + + this.outputStreams.put(streamId, fields); + this.outputSchemas.put(streamId, fields.size()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java new file mode 100644 index 0000000..91fc090 --- /dev/null +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import backtype.storm.spout.ISpoutOutputCollector; + +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; + +import java.util.HashMap; +import java.util.List; + +/** + * A {@link SpoutCollector} is used by {@link AbstractStormSpoutWrapper} to provided an Storm + * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into + * Flink tuples and emits them via the provide {@link SourceContext} object. + */ +class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector { + + /** The Flink source context object */ + private final SourceContext<OUT> flinkContext; + + /** + * Instantiates a new {@link SpoutCollector} that emits Flink tuples to the given Flink source context. If the + * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0 + * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. + * + * @param numberOfAttributes + * The number of attributes of the emitted tuples. + * @param flinkContext + * The Flink source context to be used. + * @throws UnsupportedOperationException + * if the specified number of attributes is greater than 25 + */ + SpoutCollector(final HashMap<String, Integer> numberOfAttributes, + final SourceContext<OUT> flinkContext) throws UnsupportedOperationException { + super(numberOfAttributes); + assert (flinkContext != null); + this.flinkContext = flinkContext; + } + + @Override + protected List<Integer> doEmit(final OUT flinkTuple) { + this.flinkContext.collect(flinkTuple); + // TODO + return null; + } + + @Override + public void reportError(final Throwable error) { + // not sure, if Flink can support this + } + + @Override + public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) { + return this.tansformAndEmit(streamId, tuple); + } + + + @Override + public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) { + throw new UnsupportedOperationException("Direct emit is not supported by Flink"); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java new file mode 100644 index 0000000..914a19d --- /dev/null +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.wrappers; + +import java.util.Collection; +import java.util.HashMap; + +import backtype.storm.generated.StormTopology; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.topology.IRichSpout; + +import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.storm.util.FiniteSpout; +import org.apache.flink.storm.util.StormConfig; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import com.google.common.collect.Sets; + +/** + * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It + * takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see + * {@link SpoutCollector} for supported types).<br /> + * <br /> + * Per default, {@link SpoutWrapper} calls the wrapped spout's {@link IRichSpout#nextTuple() nextTuple()} method in + * an infinite loop.<br /> + * Alternatively, {@link SpoutWrapper} can call {@link IRichSpout#nextTuple() nextTuple()} for a finite number of + * times and terminate automatically afterwards (for finite input streams). The number of {@code nextTuple()} calls can + * be specified as a certain number of invocations or can be undefined. In the undefined case, {@link SpoutWrapper} + * terminates if no record was emitted to the output collector for the first time during a call to + * {@link IRichSpout#nextTuple() nextTuple()}.<br /> + * If the given spout implements {@link FiniteSpout} interface and {@link #numberOfInvocations} is not provided or + * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until + * {@link FiniteSpout#reachedEnd()} returns true. + */ +public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> { + private static final long serialVersionUID = -218340336648247605L; + + /** Number of attributes of the spouts's output tuples per stream. */ + private final HashMap<String, Integer> numberOfAttributes; + /** The wrapped {@link IRichSpout spout}. */ + private final IRichSpout spout; + /** The wrapper of the given Flink collector. */ + private SpoutCollector<OUT> collector; + /** Indicates, if the source is still running or was canceled. */ + private volatile boolean isRunning = true; + /** The number of {@link IRichSpout#nextTuple()} calls. */ + private Integer numberOfInvocations; // do not use int -> null indicates an infinite loop + /** The original Storm topology. */ + private StormTopology stormTopology; + + /** + * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of + * the given {@link IRichSpout spout} in an infinite loop. The output type will be one of {@link Tuple0} to + * {@link Tuple25} depending on the spout's declared number of attributes. + * + * @param spout + * The {@link IRichSpout spout} to be used. + * @throws IllegalArgumentException + * If the number of declared output attributes is not with range [0;25]. + */ + public SpoutWrapper(final IRichSpout spout) throws IllegalArgumentException { + this(spout, (Collection<String>) null, null); + } + + /** + * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of + * the given {@link IRichSpout spout} a finite number of times. The output type will be one of {@link Tuple0} to + * {@link Tuple25} depending on the spout's declared number of attributes. + * + * @param spout + * The {@link IRichSpout spout} to be used. + * @param numberOfInvocations + * The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper} + * terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is + * disabled. + * @throws IllegalArgumentException + * If the number of declared output attributes is not with range [0;25]. + */ + public SpoutWrapper(final IRichSpout spout, final Integer numberOfInvocations) + throws IllegalArgumentException { + this(spout, (Collection<String>) null, numberOfInvocations); + } + + /** + * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of + * the given {@link IRichSpout spout} in an infinite loop. The output type can be any type if parameter + * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is + * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared + * number of attributes. + * + * @param spout + * The {@link IRichSpout spout} to be used. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. (Can be {@code null}.) + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [0;25]. + */ + public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs) + throws IllegalArgumentException { + this(spout, Sets.newHashSet(rawOutputs), null); + } + + /** + * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of + * the given {@link IRichSpout spout} a finite number of times. The output type can be any type if parameter + * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is + * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared + * number of attributes. + * + * @param spout + * The {@link IRichSpout spout} to be used. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. (Can be {@code null}.) + * @param numberOfInvocations + * The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper} + * terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is + * disabled. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [0;25]. + */ + public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs, + final Integer numberOfInvocations) throws IllegalArgumentException { + this(spout, Sets.newHashSet(rawOutputs), numberOfInvocations); + } + + /** + * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of + * the given {@link IRichSpout spout} in an infinite loop. The output type can be any type if parameter + * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is + * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared + * number of attributes. + * + * @param spout + * The {@link IRichSpout spout} to be used. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. (Can be {@code null}.) + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [0;25]. + */ + public SpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs) + throws IllegalArgumentException { + this(spout, rawOutputs, null); + } + + /** + * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of + * the given {@link IRichSpout spout} a finite number of times. The output type can be any type if parameter + * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is + * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared + * number of attributes. + * + * @param spout + * The {@link IRichSpout spout} to be used. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be + * of a raw type. (Can be {@code null}.) + * @param numberOfInvocations + * The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper} + * terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is + * disabled. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [0;25]. + */ + public SpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs, + final Integer numberOfInvocations) throws IllegalArgumentException { + this.spout = spout; + this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs); + this.numberOfInvocations = numberOfInvocations; + } + + /** + * Sets the original Storm topology. + * + * @param stormTopology + * The original Storm topology. + */ + public void setStormTopology(StormTopology stormTopology) { + this.stormTopology = stormTopology; + } + + @Override + public final void run(final SourceContext<OUT> ctx) throws Exception { + this.collector = new SpoutCollector<OUT>(this.numberOfAttributes, ctx); + + GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig() + .getGlobalJobParameters(); + StormConfig stormConfig = new StormConfig(); + + if (config != null) { + if (config instanceof StormConfig) { + stormConfig = (StormConfig) config; + } else { + stormConfig.putAll(config.toMap()); + } + } + + this.spout.open(stormConfig, WrapperSetupHelper.createTopologyContext( + (StreamingRuntimeContext) super.getRuntimeContext(), this.spout, + this.stormTopology, stormConfig), new SpoutOutputCollector(this.collector)); + this.spout.activate(); + + if (numberOfInvocations == null) { + if (this.spout instanceof FiniteSpout) { + final FiniteSpout finiteSpout = (FiniteSpout) this.spout; + + while (this.isRunning && !finiteSpout.reachedEnd()) { + finiteSpout.nextTuple(); + } + } else { + while (this.isRunning) { + this.spout.nextTuple(); + } + } + } else { + int counter = this.numberOfInvocations; + if (counter >= 0) { + while ((--counter >= 0) && this.isRunning) { + this.spout.nextTuple(); + } + } else { + do { + this.collector.tupleEmitted = false; + this.spout.nextTuple(); + } while (this.collector.tupleEmitted && this.isRunning); + } + } + } + + /** + * {@inheritDoc} + * <p/> + * Sets the {@link #isRunning} flag to {@code false}. + */ + @Override + public void cancel() { + this.isRunning = false; + } + + @Override + public void close() throws Exception { + this.spout.close(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java new file mode 100644 index 0000000..c9ab8e5 --- /dev/null +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.wrappers; + +/* + * We do neither import + * backtype.storm.tuple.Tuple; + * nor + * org.apache.flink.api.java.tuple.Tuple + * to avoid confusion + */ + +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.MessageId; +import backtype.storm.tuple.Values; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.List; + +/** + * {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple. + */ +class StormTuple<IN> implements backtype.storm.tuple.Tuple { + + /** The Storm representation of the original Flink tuple */ + private final Values stormTuple; + /** The schema (ie, ordered field names) of the tuple */ + private final Fields schema; + + /** + * Create a new Storm tuple from the given Flink tuple. The provided {@code nameIndexMap} is ignored for raw input + * types. + * + * @param flinkTuple + * The Flink tuple to be converted. + * @param schema + * The schema (ie, ordered field names) of the tuple. + */ + StormTuple(final IN flinkTuple, final Fields schema) { + if (flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) { + this.schema = schema; + final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple) flinkTuple; + + final int numberOfAttributes = t.getArity(); + this.stormTuple = new Values(); + for (int i = 0; i < numberOfAttributes; ++i) { + this.stormTuple.add(t.getField(i)); + } + } else { + this.schema = null; + this.stormTuple = new Values(flinkTuple); + } + } + + @Override + public int size() { + return this.stormTuple.size(); + } + + @Override + public boolean contains(final String field) { + if (this.schema != null) { + return this.schema.contains(field); + } + + try { + this.getPublicMemberField(field); + return true; + } catch (NoSuchFieldException f) { + try { + this.getGetterMethod(field); + return true; + } catch (Exception g) { + return false; + } + } catch (Exception e) { + return false; + } + } + + @Override + public Fields getFields() { + return this.schema; + } + + @Override + public int fieldIndex(final String field) { + return this.schema.fieldIndex(field); + } + + @Override + public List<Object> select(final Fields selector) { + return this.schema.select(selector, this.stormTuple); + } + + @Override + public Object getValue(final int i) { + return this.stormTuple.get(i); + } + + @Override + public String getString(final int i) { + return (String) this.stormTuple.get(i); + } + + @Override + public Integer getInteger(final int i) { + return (Integer) this.stormTuple.get(i); + } + + @Override + public Long getLong(final int i) { + return (Long) this.stormTuple.get(i); + } + + @Override + public Boolean getBoolean(final int i) { + return (Boolean) this.stormTuple.get(i); + } + + @Override + public Short getShort(final int i) { + return (Short) this.stormTuple.get(i); + } + + @Override + public Byte getByte(final int i) { + return (Byte) this.stormTuple.get(i); + } + + @Override + public Double getDouble(final int i) { + return (Double) this.stormTuple.get(i); + } + + @Override + public Float getFloat(final int i) { + return (Float) this.stormTuple.get(i); + } + + @Override + public byte[] getBinary(final int i) { + return (byte[]) this.stormTuple.get(i); + } + + private Field getPublicMemberField(final String field) throws Exception { + assert (this.stormTuple.size() == 1); + return this.stormTuple.get(0).getClass().getField(field); + } + + private Method getGetterMethod(final String field) throws Exception { + assert (this.stormTuple.size() == 1); + return this.stormTuple + .get(0) + .getClass() + .getMethod("get" + Character.toUpperCase(field.charAt(0)) + field.substring(1), + (Class<?>[]) null); + } + + private Object getValueByPublicMember(final String field) throws Exception { + assert (this.stormTuple.size() == 1); + return getPublicMemberField(field).get(this.stormTuple.get(0)); + } + + private Object getValueByGetter(final String field) throws Exception { + assert (this.stormTuple.size() == 1); + return getGetterMethod(field).invoke(this.stormTuple.get(0), (Object[]) null); + } + + @SuppressWarnings("unchecked") + public <T> T getValueByName(final String field) { + if (this.schema != null) { + return (T) this.getValue(this.schema.fieldIndex(field)); + } + assert (this.stormTuple.size() == 1); + + Exception e; + try { + // try public member + return (T) getValueByPublicMember(field); + } catch (NoSuchFieldException f) { + try { + // try getter-method + return (T) getValueByGetter(field); + } catch (Exception g) { + e = g; + } + } catch (Exception f) { + e = f; + } + + throw new RuntimeException("Could not access field <" + field + ">", e); + } + + @Override + public Object getValueByField(final String field) { + return getValueByName(field); + } + + @Override + public String getStringByField(final String field) { + return getValueByName(field); + } + + @Override + public Integer getIntegerByField(final String field) { + return getValueByName(field); + } + + @Override + public Long getLongByField(final String field) { + return getValueByName(field); + } + + @Override + public Boolean getBooleanByField(final String field) { + return getValueByName(field); + } + + @Override + public Short getShortByField(final String field) { + return getValueByName(field); + } + + @Override + public Byte getByteByField(final String field) { + return getValueByName(field); + } + + @Override + public Double getDoubleByField(final String field) { + return getValueByName(field); + } + + @Override + public Float getFloatByField(final String field) { + return getValueByName(field); + } + + @Override + public byte[] getBinaryByField(final String field) { + return getValueByName(field); + } + + @Override + public List<Object> getValues() { + return this.stormTuple; + } + + @Override + public GlobalStreamId getSourceGlobalStreamid() { + // not sure if Flink can support this + throw new UnsupportedOperationException(); + } + + @Override + public String getSourceComponent() { + // not sure if Flink can support this + throw new UnsupportedOperationException(); + } + + @Override + public int getSourceTask() { + // not sure if Flink can support this + throw new UnsupportedOperationException(); + } + + @Override + public String getSourceStreamId() { + // not sure if Flink can support this + throw new UnsupportedOperationException(); + } + + @Override + public MessageId getMessageId() { + // not sure if Flink can support this + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = (prime * result) + ((this.stormTuple == null) ? 0 : this.stormTuple.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (this.getClass() != obj.getClass()) { + return false; + } + final StormTuple<?> other = (StormTuple<?>) obj; + if (this.stormTuple == null) { + if (other.stormTuple != null) { + return false; + } + } else if (!this.stormTuple.equals(other.stormTuple)) { + return false; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java new file mode 100644 index 0000000..d529b6a --- /dev/null +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.wrappers; + +import backtype.storm.Config; +import backtype.storm.generated.Bolt; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.SpoutSpec; +import backtype.storm.generated.StateSpoutSpec; +import backtype.storm.generated.StormTopology; +import backtype.storm.generated.StreamInfo; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IComponent; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.IRichSpout; +import backtype.storm.tuple.Fields; + +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + +import clojure.lang.Atom; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * {@link WrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} and + * {@link BoltWrapper}. + */ +class WrapperSetupHelper { + + /** The configuration key for the topology name. */ + final static String TOPOLOGY_NAME = "storm.topology.name"; + + /** + * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link BoltWrapper} + * per declared output stream. The number is {@code -1} for raw output type or a value within range [0;25] for + * output type {@link org.apache.flink.api.java.tuple.Tuple0 Tuple0} to + * {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25}. + * + * @param spoutOrBolt + * The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used. + * @param rawOutputs + * Contains stream names if a single attribute output stream, should not be of type + * {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} but be of a raw type. (Can be {@code null}.) + * @return The number of attributes to be used for each stream. + * @throws IllegalArgumentException + * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if + * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range + * [0;25]. + */ + static HashMap<String, Integer> getNumberOfAttributes(final IComponent spoutOrBolt, + final Collection<String> rawOutputs) + throws IllegalArgumentException { + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + spoutOrBolt.declareOutputFields(declarer); + + for (Entry<String, Integer> schema : declarer.outputSchemas.entrySet()) { + int declaredNumberOfAttributes = schema.getValue(); + if ((declaredNumberOfAttributes < 0) || (declaredNumberOfAttributes > 25)) { + throw new IllegalArgumentException( + "Provided bolt declares non supported number of output attributes. Must be in range [0;25] but " + + "was " + declaredNumberOfAttributes); + } + + if (rawOutputs != null && rawOutputs.contains(schema.getKey())) { + if (declaredNumberOfAttributes != 1) { + throw new IllegalArgumentException( + "Ouput type is requested to be raw type, but provided bolt declares more then one output " + + "attribute."); + } + schema.setValue(-1); + } + } + + return declarer.outputSchemas; + } + + /** Used to computed unique task IDs for a Storm topology. */ + private static int tid; + + /** + * Creates a {@link TopologyContext} for a Spout or Bolt instance (ie, Flink task / Storm executor). + * + * @param context + * The Flink runtime context. + * @param spoutOrBolt + * The Spout or Bolt this context is created for. + * @param stormTopology + * The original Storm topology. + * @param stormConfig + * The user provided configuration. + * @return The created {@link TopologyContext}. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + static synchronized TopologyContext createTopologyContext( + final StreamingRuntimeContext context, final IComponent spoutOrBolt, + StormTopology stormTopology, Map stormConfig) { + String operatorName = context.getTaskName(); + if (operatorName.startsWith("Source: ")) { + // prefix "Source: " is inserted by Flink sources by default -- need to get rid of it here + operatorName = operatorName.substring(8); + } + final int dop = context.getNumberOfParallelSubtasks(); + + final Map<Integer, String> taskToComponents = new HashMap<Integer, String>(); + final Map<String, List<Integer>> componentToSortedTasks = new HashMap<String, List<Integer>>(); + final Map<String, Map<String, Fields>> componentToStreamToFields = new HashMap<String, Map<String, Fields>>(); + String stormId = (String) stormConfig.get(TOPOLOGY_NAME); + String codeDir = null; // not supported + String pidDir = null; // not supported + Integer taskId = null; + Integer workerPort = null; // not supported + List<Integer> workerTasks = new ArrayList<Integer>(); + final Map<String, Object> defaultResources = new HashMap<String, Object>(); + final Map<String, Object> userResources = new HashMap<String, Object>(); + final Map<String, Object> executorData = new HashMap<String, Object>(); + final Map registeredMetrics = new HashMap(); + Atom openOrPrepareWasCalled = null; + + if (stormTopology == null) { + // embedded mode + ComponentCommon common = new ComponentCommon(); + common.set_parallelism_hint(dop); + + HashMap<String, SpoutSpec> spouts = new HashMap<String, SpoutSpec>(); + HashMap<String, Bolt> bolts = new HashMap<String, Bolt>(); + if (spoutOrBolt instanceof IRichSpout) { + spouts.put(operatorName, new SpoutSpec(null, common)); + } else { + assert (spoutOrBolt instanceof IRichBolt); + bolts.put(operatorName, new Bolt(null, common)); + } + stormTopology = new StormTopology(spouts, bolts, new HashMap<String, StateSpoutSpec>()); + + taskId = context.getIndexOfThisSubtask(); + + List<Integer> sortedTasks = new ArrayList<Integer>(dop); + for (int i = 1; i <= dop; ++i) { + taskToComponents.put(i, operatorName); + sortedTasks.add(i); + } + componentToSortedTasks.put(operatorName, sortedTasks); + + SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + spoutOrBolt.declareOutputFields(declarer); + componentToStreamToFields.put(operatorName, declarer.outputStreams); + } else { + // whole topology is built (ie, FlinkTopologyBuilder is used) + Map<String, SpoutSpec> spouts = stormTopology.get_spouts(); + Map<String, Bolt> bolts = stormTopology.get_bolts(); + Map<String, StateSpoutSpec> stateSpouts = stormTopology.get_state_spouts(); + + tid = 1; + + for (Entry<String, SpoutSpec> spout : spouts.entrySet()) { + Integer rc = processSingleOperator(spout.getKey(), spout.getValue().get_common(), + operatorName, context.getIndexOfThisSubtask(), dop, taskToComponents, + componentToSortedTasks, componentToStreamToFields); + if (rc != null) { + taskId = rc; + } + } + for (Entry<String, Bolt> bolt : bolts.entrySet()) { + Integer rc = processSingleOperator(bolt.getKey(), bolt.getValue().get_common(), + operatorName, context.getIndexOfThisSubtask(), dop, taskToComponents, + componentToSortedTasks, componentToStreamToFields); + if (rc != null) { + taskId = rc; + } + } + for (Entry<String, StateSpoutSpec> stateSpout : stateSpouts.entrySet()) { + Integer rc = taskId = processSingleOperator(stateSpout.getKey(), stateSpout + .getValue().get_common(), operatorName, context.getIndexOfThisSubtask(), + dop, taskToComponents, componentToSortedTasks, componentToStreamToFields); + if (rc != null) { + taskId = rc; + } + } + assert (taskId != null); + } + + if (!stormConfig.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) { + stormConfig.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30); // Storm default value + } + + return new FlinkTopologyContext(stormTopology, stormConfig, taskToComponents, + componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, + taskId, workerPort, workerTasks, defaultResources, userResources, executorData, + registeredMetrics, openOrPrepareWasCalled); + } + + /** + * Sets up {@code taskToComponents}, {@code componentToSortedTasks}, and {@code componentToStreamToFields} for a + * single instance of a Spout or Bolt (ie, task or executor). Furthermore, is computes the unique task-id. + * + * @param componentId + * The ID of the Spout/Bolt in the topology. + * @param common + * The common operator object (that is all Spouts and Bolts have). + * @param operatorName + * The Flink operator name. + * @param index + * The index of the currently processed tasks with its operator. + * @param dop + * The parallelism of the operator. + * @param taskToComponents + * OUTPUT: A map from all task IDs of the topology to their component IDs. + * @param componentToSortedTasks + * OUTPUT: A map from all component IDs to their sorted list of corresponding task IDs. + * @param componentToStreamToFields + * OUTPUT: A map from all component IDs to there output streams and output fields. + * + * @return A unique task ID if the currently processed Spout or Bolt ({@code componentId}) is equal to the current + * Flink operator ({@link operatorName}) -- {@code null} otherwise. + */ + private static Integer processSingleOperator(final String componentId, + final ComponentCommon common, final String operatorName, final int index, + final int dop, final Map<Integer, String> taskToComponents, + final Map<String, List<Integer>> componentToSortedTasks, + final Map<String, Map<String, Fields>> componentToStreamToFields) { + final int parallelism_hint = common.get_parallelism_hint(); + Integer taskId = null; + + if (componentId.equals(operatorName)) { + taskId = tid + index; + } + + List<Integer> sortedTasks = new ArrayList<Integer>(dop); + for (int i = 0; i < parallelism_hint; ++i) { + taskToComponents.put(tid, componentId); + sortedTasks.add(tid); + ++tid; + } + componentToSortedTasks.put(componentId, sortedTasks); + + if (componentId.equals(operatorName)) { + } + + Map<String, Fields> outputStreams = new HashMap<String, Fields>(); + for(Entry<String, StreamInfo> outStream : common.get_streams().entrySet()) { + outputStreams.put(outStream.getKey(), new Fields(outStream.getValue().get_output_fields())); + } + componentToStreamToFields.put(componentId, outputStreams); + + return taskId; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java new file mode 100644 index 0000000..49de476 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.api; + +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer; +import org.apache.flink.storm.util.AbstractTest; +import org.junit.Assert; +import org.junit.Test; + +import java.util.LinkedList; + +public class FlinkOutputFieldsDeclarerTest extends AbstractTest { + + + + @Test + public void testNull() { + Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null)); + } + + @Test + public void testDeclare() { + for (int i = 0; i < 2; ++i) { // test case: simple / non-direct + for (int j = 1; j < 2; ++j) { // number of streams + for (int k = 0; k <= 25; ++k) { // number of attributes + this.runDeclareTest(i, j, k); + } + } + } + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareSimpleToManyAttributes() { + this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26); + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareNonDirectToManyAttributes() { + this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26); + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareDefaultStreamToManyAttributes() { + this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26); + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareFullToManyAttributes() { + this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26); + } + + private void runDeclareTest(final int testCase, final int numberOfStreams, + final int numberOfAttributes) { + final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + + String[] streams = null; + if (numberOfStreams > 1 || r.nextBoolean()) { + streams = new String[numberOfStreams]; + for (int i = 0; i < numberOfStreams; ++i) { + streams[i] = "stream" + i; + } + } + + final String[] attributes = new String[numberOfAttributes]; + for (int i = 0; i < attributes.length; ++i) { + attributes[i] = "a" + i; + } + + switch (testCase) { + case 0: + this.declareSimple(declarer, streams, attributes); + break; + default: + this.declareNonDirect(declarer, streams, attributes); + } + + if (streams == null) { + streams = new String[] { Utils.DEFAULT_STREAM_ID }; + } + + for (String stream : streams) { + final TypeInformation<?> type = declarer.getOutputType(stream); + + if (numberOfAttributes == 1) { + Assert.assertEquals(type.getClass(), GenericTypeInfo.class); + Assert.assertEquals(type.getTypeClass(), Object.class); + } else { + Assert.assertEquals(numberOfAttributes, type.getArity()); + Assert.assertTrue(type.isTupleType()); + } + } + } + + private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] streams, + final String[] attributes) { + + if (streams != null) { + for (String stream : streams) { + declarer.declareStream(stream, new Fields(attributes)); + } + } else { + declarer.declare(new Fields(attributes)); + } + } + + private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] streams, + final String[] attributes) { + + if (streams != null) { + for (String stream : streams) { + declarer.declareStream(stream, false, new Fields(attributes)); + } + } else { + declarer.declare(false, new Fields(attributes)); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testUndeclared() { + final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + declarer.getOutputType("unknownStreamId"); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeclareDirect() { + new FlinkOutputFieldsDeclarer().declare(true, null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeclareDirect2() { + new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null); + } + + @Test + public void testGetGroupingFieldIndexes() { + final int numberOfAttributes = 5 + this.r.nextInt(21); + final String[] attributes = new String[numberOfAttributes]; + for (int i = 0; i < numberOfAttributes; ++i) { + attributes[i] = "a" + i; + } + + final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + declarer.declare(new Fields(attributes)); + + final int numberOfKeys = 1 + this.r.nextInt(25); + final LinkedList<String> groupingFields = new LinkedList<String>(); + final boolean[] indexes = new boolean[numberOfAttributes]; + + for (int i = 0; i < numberOfAttributes; ++i) { + if (this.r.nextInt(26) < numberOfKeys) { + groupingFields.add(attributes[i]); + indexes[i] = true; + } else { + indexes[i] = false; + } + } + + final int[] expectedResult = new int[groupingFields.size()]; + int j = 0; + for (int i = 0; i < numberOfAttributes; ++i) { + if (indexes[i]) { + expectedResult[j++] = i; + } + } + + final int[] result = declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID, + groupingFields); + + Assert.assertEquals(expectedResult.length, result.length); + for (int i = 0; i < expectedResult.length; ++i) { + Assert.assertEquals(expectedResult[i], result[i]); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java new file mode 100644 index 0000000..e6fb8e5 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.api; + +import org.apache.flink.storm.api.FlinkTopologyBuilder; +import org.apache.flink.storm.util.TestDummyBolt; +import org.apache.flink.storm.util.TestDummySpout; +import org.apache.flink.storm.util.TestSink; +import org.junit.Test; + +import backtype.storm.tuple.Fields; + +public class FlinkTopologyBuilderTest { + + @Test(expected = RuntimeException.class) + public void testUnknowSpout() { + FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown"); + builder.createTopology(); + } + + @Test(expected = RuntimeException.class) + public void testUnknowBolt() { + FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout"); + builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown"); + builder.createTopology(); + } + + @Test(expected = RuntimeException.class) + public void testUndeclaredStream() { + FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); + builder.setSpout("spout", new TestSpout()); + builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout"); + builder.createTopology(); + } + + @Test + public void testFieldsGroupingOnMultipleSpoutOutputStreams() { + FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder(); + + flinkBuilder.setSpout("spout", new TestDummySpout()); + flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("spout", + TestDummySpout.spoutStreamId, new Fields("id")); + + flinkBuilder.createTopology(); + } + + @Test + public void testFieldsGroupingOnMultipleBoltOutputStreams() { + FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder(); + + flinkBuilder.setSpout("spout", new TestDummySpout()); + flinkBuilder.setBolt("bolt", new TestDummyBolt()).shuffleGrouping("spout"); + flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("bolt", + TestDummyBolt.groupingStreamId, new Fields("id")); + + flinkBuilder.createTopology(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java new file mode 100644 index 0000000..9d04ca5 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.storm.api; + +import org.apache.flink.storm.api.FlinkTopology; +import org.junit.Assert; +import org.junit.Test; + +public class FlinkTopologyTest { + + @Test + public void testDefaultParallelism() { + final FlinkTopology topology = new FlinkTopology(); + Assert.assertEquals(1, topology.getParallelism()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testExecute() throws Exception { + new FlinkTopology().execute(); + } + + @Test(expected = UnsupportedOperationException.class) + public void testExecuteWithName() throws Exception { + new FlinkTopology().execute(null); + } + + @Test + public void testNumberOfTasks() { + final FlinkTopology topology = new FlinkTopology(); + + Assert.assertEquals(0, topology.getNumberOfTasks()); + + topology.increaseNumberOfTasks(3); + Assert.assertEquals(3, topology.getNumberOfTasks()); + + topology.increaseNumberOfTasks(2); + Assert.assertEquals(5, topology.getNumberOfTasks()); + + topology.increaseNumberOfTasks(8); + Assert.assertEquals(13, topology.getNumberOfTasks()); + } + + @Test(expected = AssertionError.class) + public void testAssert() { + new FlinkTopology().increaseNumberOfTasks(0); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java new file mode 100644 index 0000000..74ea67e --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.api; + +import java.util.Map; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; + +public class TestBolt implements IRichBolt { + private static final long serialVersionUID = -667148827441397683L; + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {} + + @Override + public void execute(Tuple input) {} + + @Override + public void cleanup() {} + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) {} + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java new file mode 100644 index 0000000..4abb604 --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.api; + +import java.util.Map; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.OutputFieldsDeclarer; + +public class TestSpout implements IRichSpout { + private static final long serialVersionUID = -4884029383198924007L; + + @SuppressWarnings("rawtypes") + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {} + + @Override + public void close() {} + + @Override + public void activate() {} + + @Override + public void deactivate() {} + + @Override + public void nextTuple() {} + + @Override + public void ack(Object msgId) {} + + @Override + public void fail(Object msgId) {} + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) {} + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestTopologyBuilder.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestTopologyBuilder.java new file mode 100644 index 0000000..7bea94c --- /dev/null +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestTopologyBuilder.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.storm.api; + +import org.apache.flink.storm.api.FlinkTopologyBuilder; + +import backtype.storm.generated.StormTopology; + +public class TestTopologyBuilder extends FlinkTopologyBuilder { + @Override + public StormTopology getStormTopology() { + return super.getStormTopology(); + } +}