http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java deleted file mode 100644 index 30227b8..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.stormcompatibility.api; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.stormcompatibility.util.SplitStreamType; -import org.apache.flink.streaming.util.keys.KeySelectorUtil; -import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector; - -/** - * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via - * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams. - * - * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular - * {@link ArrayKeySelector} on it. - */ -public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> { - private static final long serialVersionUID = 4672434660037669254L; - - private final ArrayKeySelector<Tuple> selector; - - public SplitStreamTypeKeySelector(int... fields) { - this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields); - } - - @Override - public Tuple getKey(SplitStreamType<Tuple> value) throws Exception { - return selector.getKey(value.value); - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java deleted file mode 100644 index 114fa7c..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper; - -import backtype.storm.topology.IRichSpout; - -/** - * This interface represents a Storm spout that emits a finite number of records. Common Storm - * spouts emit infinite streams by default. To change this behaviour and take advantage of - * Flink's finite-source capabilities, the spout should implement this interface. To wrap - * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}. - */ -public interface FiniteStormSpout extends IRichSpout { - - /** - * When returns true, the spout has reached the end of the stream. - * - * @return true, if the spout's stream reached its end, false otherwise - */ - public boolean reachedEnd(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java deleted file mode 100644 index 3eee8d6..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -import java.util.HashMap; -import java.util.List; - -/** - * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a - * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt bolt}.<br /> - * <br /> - * <strong>CAUTION: Flink does not support direct emit.</strong> - */ -public final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer { - - /** The declared output streams and schemas. */ - public final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>(); - - @Override - public void declare(final Fields fields) { - this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields); - } - - /** - * {@inheritDoc} - * <p/> - * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}. - * - * @throws UnsupportedOperationException - * if {@code direct} is {@code true} - */ - @Override - public void declare(final boolean direct, final Fields fields) { - this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields); - } - - @Override - public void declareStream(final String streamId, final Fields fields) { - this.declareStream(streamId, false, fields); - } - - /** - * {@inheritDoc} - * <p/> - * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}. - * - * @throws UnsupportedOperationException - * if {@code direct} is {@code true} - */ - @Override - public void declareStream(final String streamId, final boolean direct, final Fields fields) { - if (direct) { - throw new UnsupportedOperationException("Direct emit is not supported by Flink"); - } - - this.outputStreams.put(streamId, fields); - } - - /** - * Returns {@link TypeInformation} for the declared output schema for a specific stream. - * - * @param streamId - * A stream ID. - * - * @return output type information for the declared output schema of the specified stream; or {@code null} if - * {@code streamId == null} - * - * @throws IllegalArgumentException - * If no output schema was declared for the specified stream or if more then 25 attributes got declared. - */ - public TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException { - if (streamId == null) { - return null; - } - - Fields outputSchema = this.outputStreams.get(streamId); - if (outputSchema == null) { - throw new IllegalArgumentException("Stream with ID '" + streamId - + "' was not declared."); - } - - Tuple t; - final int numberOfAttributes = outputSchema.size(); - - if (numberOfAttributes == 1) { - return TypeExtractor.getForClass(Object.class); - } else if (numberOfAttributes <= 25) { - try { - t = Tuple.getTupleClass(numberOfAttributes).newInstance(); - } catch (final InstantiationException e) { - throw new RuntimeException(e); - } catch (final IllegalAccessException e) { - throw new RuntimeException(e); - } - } else { - throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes"); - } - - // TODO: declare only key fields as DefaultComparable - for (int i = 0; i < numberOfAttributes; ++i) { - t.setField(new DefaultComparable(), i); - } - - return TypeExtractor.getForObject(t); - } - - /** - * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link - * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable, - * Flink cannot use them and will throw an exception. - */ - private static class DefaultComparable implements Comparable<DefaultComparable> { - - public DefaultComparable() { - } - - @Override - public int compareTo(final DefaultComparable o) { - return 0; - } - } - - /** - * Computes the indexes within the declared output schema of the specified stream, for a list of given - * field-grouping attributes. - * - * @param streamId - * A stream ID. - * @param groupingFields - * The names of the key fields. - * - * @return array of {@code int}s that contains the index within the output schema for each attribute in the given - * list - */ - public int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) { - final int[] fieldIndexes = new int[groupingFields.size()]; - - for (int i = 0; i < fieldIndexes.length; ++i) { - fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i)); - } - - return fieldIndexes; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java deleted file mode 100644 index 7e60a87..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.stormcompatibility.util; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder; -import org.apache.flink.streaming.api.collector.selector.OutputSelector; - -/** - * Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink. - */ -public final class FlinkStormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> { - private static final long serialVersionUID = 2553423379715401023L; - - /** internal cache to avoid short living ArrayList objects. */ - private final HashMap<String, List<String>> streams = new HashMap<String, List<String>>(); - - @Override - public Iterable<String> select(SplitStreamType<T> value) { - String sid = value.streamId; - List<String> streamId = this.streams.get(sid); - if (streamId == null) { - streamId = new ArrayList<String>(1); - streamId.add(sid); - this.streams.put(sid, streamId); - } - return streamId; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java deleted file mode 100644 index 14af830..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.util; - -import backtype.storm.generated.StormTopology; -import backtype.storm.hooks.ITaskHook; -import backtype.storm.metric.api.CombinedMetric; -import backtype.storm.metric.api.ICombiner; -import backtype.storm.metric.api.IMetric; -import backtype.storm.metric.api.IReducer; -import backtype.storm.metric.api.ReducedMetric; -import backtype.storm.state.ISubscribedState; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Fields; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import clojure.lang.Atom; - -/** - * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when - * a Storm topology is executed within Flink. - */ -public final class FlinkTopologyContext extends TopologyContext { - - /** - * Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated - * for each parallel task - */ - public FlinkTopologyContext(final StormTopology topology, @SuppressWarnings("rawtypes") final Map stormConf, - final Map<Integer, String> taskToComponent, final Map<String, List<Integer>> componentToSortedTasks, - final Map<String, Map<String, Fields>> componentToStreamToFields, final String stormId, final String codeDir, - final String pidDir, final Integer taskId, final Integer workerPort, final List<Integer> workerTasks, - final Map<String, Object> defaultResources, final Map<String, Object> userResources, - final Map<String, Object> executorData, @SuppressWarnings("rawtypes") final Map registeredMetrics, - final Atom openOrPrepareWasCalled) { - super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, - codeDir, pidDir, taskId, workerPort, workerTasks, defaultResources, userResources, executorData, - registeredMetrics, openOrPrepareWasCalled); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public void addTaskHook(final ITaskHook hook) { - throw new UnsupportedOperationException("Task hooks are not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public Collection<ITaskHook> getHooks() { - throw new UnsupportedOperationException("Task hooks are not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public IMetric getRegisteredMetricByName(final String name) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); - - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @SuppressWarnings("rawtypes") - @Override - public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @SuppressWarnings("rawtypes") - @Override - public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @SuppressWarnings("unchecked") - @Override - public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public <T extends ISubscribedState> T setAllSubscribedState(final T obj) { - throw new UnsupportedOperationException("Not supported by Flink"); - - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) { - throw new UnsupportedOperationException("Not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T - obj) { - throw new UnsupportedOperationException("Not supported by Flink"); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java deleted file mode 100644 index 9cb44ec..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.stormcompatibility.util; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.SplitStream; - -/** - * Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped record of type {@code T}. Can be used to get - * a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams got separated using - * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} and - * {@link SplitStream#select(String...) .select(...)}). - * - * @param <T> - */ -public class SplitStreamMapper<T> implements MapFunction<SplitStreamType<T>, T> { - private static final long serialVersionUID = 3550359150160908564L; - - @Override - public T map(SplitStreamType<T> value) throws Exception { - return value.value; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java deleted file mode 100644 index 9c7e477..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.stormcompatibility.util; - -import org.apache.flink.streaming.api.datastream.DataStream; - -/** - * Used by {@link org.apache.flink.stormcompatibility.wrappers.AbstractStormCollector AbstractStormCollector} to wrap - * output tuples if multiple output streams are declared. For this case, the Flink output data stream must be split via - * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} using - * {@link FlinkStormStreamSelector}. - */ -public class SplitStreamType<T> { - - /** The stream ID this tuple belongs to. */ - public String streamId; - /** The actual data value. */ - public T value; - - @Override - public String toString() { - return "<sid:" + this.streamId + ",v:" + this.value + ">"; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SplitStreamType<?> other = (SplitStreamType<?>) o; - - return this.streamId.equals(other.streamId) && this.value.equals(other.value); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java deleted file mode 100644 index 200f772..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.stormcompatibility.util; - -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.streaming.util.keys.KeySelectorUtil; -import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector; - -/** - * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via - * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams. - * - * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular - * {@link ArrayKeySelector} on it. - */ -public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> { - private static final long serialVersionUID = 4672434660037669254L; - - private final ArrayKeySelector<Tuple> selector; - - public SplitStreamTypeKeySelector(int... fields) { - this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields); - } - - @Override - public Tuple getKey(SplitStreamType<Tuple> value) throws Exception { - return selector.getKey(value.value); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java deleted file mode 100644 index 6726ae8..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.stormcompatibility.util; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; - -import backtype.storm.Config; - -/** - * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config} - * object) for embedded Spouts and Bolts. - */ -@SuppressWarnings("rawtypes") -public final class StormConfig extends GlobalJobParameters implements Map { - private static final long serialVersionUID = 8019519109673698490L; - - /** Contains the actual configuration that is provided to Spouts and Bolts. */ - private final Map config = new HashMap(); - - /** - * Creates an empty configuration. - */ - public StormConfig() { - } - - /** - * Creates an configuration with initial values provided by the given {@code Map}. - * - * @param config - * Initial values for this configuration. - */ - @SuppressWarnings("unchecked") - public StormConfig(Map config) { - this.config.putAll(config); - } - - - @Override - public int size() { - return this.config.size(); - } - - @Override - public boolean isEmpty() { - return this.config.isEmpty(); - } - - @Override - public boolean containsKey(Object key) { - return this.config.containsKey(key); - } - - @Override - public boolean containsValue(Object value) { - return this.config.containsValue(value); - } - - @Override - public Object get(Object key) { - return this.config.get(key); - } - - @SuppressWarnings("unchecked") - @Override - public Object put(Object key, Object value) { - return this.config.put(key, value); - } - - @Override - public Object remove(Object key) { - return this.config.remove(key); - } - - @SuppressWarnings("unchecked") - @Override - public void putAll(Map m) { - this.config.putAll(m); - } - - @Override - public void clear() { - this.config.clear(); - } - - @SuppressWarnings("unchecked") - @Override - public Set<Object> keySet() { - return this.config.keySet(); - } - - @SuppressWarnings("unchecked") - @Override - public Collection<Object> values() { - return this.config.values(); - } - - @SuppressWarnings("unchecked") - @Override - public Set<java.util.Map.Entry<Object, Object>> entrySet() { - return this.config.entrySet(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java deleted file mode 100644 index 7b35a64..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.stormcompatibility.wrappers; - -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.tuple.Tuple25; -import org.apache.flink.stormcompatibility.util.SplitStreamType; - -import java.util.HashMap; -import java.util.List; -import java.util.Map.Entry; - -/** - * A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples. - */ -abstract class AbstractStormCollector<OUT> { - - /** Flink output tuple of concrete type {@link Tuple0} to {@link Tuple25} per output stream. */ - protected final HashMap<String, Tuple> outputTuple = new HashMap<String, Tuple>(); - /** Flink split tuple. Used, if multiple output streams are declared. */ - private final SplitStreamType<Object> splitTuple = new SplitStreamType<Object>(); - /** - * The number of attributes of the output tuples per stream. (Determines the concrete type of {@link #outputTuple}). - * If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not used and "raw" data type is used. - */ - protected final HashMap<String, Integer> numberOfAttributes; - /** Indicates of multiple output stream are declared and thus {@link SplitStreamType} must be used as output. */ - private final boolean split; - /** Is set to {@code true} each time a tuple is emitted. */ - boolean tupleEmitted = false; - - /** - * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via {@link #doEmit(Object)}. If the - * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is - * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. - * - * @param numberOfAttributes - * The number of attributes of the emitted tuples per output stream. - * @throws UnsupportedOperationException - * if the specified number of attributes is greater than 25 - */ - public AbstractStormCollector(final HashMap<String, Integer> numberOfAttributes) - throws UnsupportedOperationException { - assert (numberOfAttributes != null); - - this.numberOfAttributes = numberOfAttributes; - this.split = this.numberOfAttributes.size() > 1; - - for (Entry<String, Integer> outputStream : numberOfAttributes.entrySet()) { - final int numAtt = outputStream.getValue(); - assert (numAtt >= -1); - - if (numAtt > 25) { - throw new UnsupportedOperationException( - "Flink cannot handle more then 25 attributes, but " + numAtt - + " are declared for stream '" + outputStream.getKey() - + "' by the given bolt"); - } else if (numAtt >= 0) { - try { - this.outputTuple.put(outputStream.getKey(), - org.apache.flink.api.java.tuple.Tuple.getTupleClass(numAtt) - .newInstance()); - } catch (final InstantiationException e) { - throw new RuntimeException(e); - } catch (final IllegalAccessException e) { - throw new RuntimeException(e); - } - - } - } - } - - /** - * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)} - * to the specified output stream. - * - * @param The - * The output stream id. - * @param tuple - * The Storm tuple to be emitted. - * @return the return value of {@link #doEmit(Object)} - */ - @SuppressWarnings("unchecked") - protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) { - List<Integer> taskIds; - - final int numAtt = this.numberOfAttributes.get(streamId); - if (numAtt > -1) { - assert (tuple.size() == numAtt); - Tuple out = this.outputTuple.get(streamId); - for (int i = 0; i < numAtt; ++i) { - out.setField(tuple.get(i), i); - } - if (this.split) { - this.splitTuple.streamId = streamId; - this.splitTuple.value = out; - - taskIds = doEmit((OUT) this.splitTuple); - } else { - taskIds = doEmit((OUT) out); - } - - } else { - assert (tuple.size() == 1); - if (split) { - this.splitTuple.streamId = streamId; - this.splitTuple.value = tuple.get(0); - - taskIds = doEmit((OUT) this.splitTuple); - } else { - taskIds = doEmit((OUT) tuple.get(0)); - } - } - this.tupleEmitted = true; - - return taskIds; - } - - /** - * Emits a Flink tuple. - * - * @param flinkTuple - * The tuple to be emitted. - * @return the IDs of the tasks this tuple was sent to - */ - protected abstract List<Integer> doEmit(OUT flinkTuple); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java deleted file mode 100644 index ccd29bb..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wrappers; - -import java.util.Collection; -import java.util.HashMap; - -import backtype.storm.generated.StormTopology; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.topology.IRichSpout; - -import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple25; -import org.apache.flink.stormcompatibility.util.StormConfig; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; - -/** - * A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink - * Streaming program. It takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see - * {@link StormSpoutCollector} for supported types).<br /> - * <br /> - * <strong>CAUTION: currently, only simple spouts are supported! (ie, spouts that do not use the Storm configuration - * <code>Map</code> or <code>TopologyContext</code> that is provided by the spouts's <code>prepare(..)</code> method. - * Furthermore, ack and fail back calls as well as tuple IDs are not supported so far.</strong> - */ -public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> { - private static final long serialVersionUID = 4993283609095408765L; - - /** Number of attributes of the bolt's output tuples per stream. */ - private final HashMap<String, Integer> numberOfAttributes; - /** The wrapped Storm {@link IRichSpout spout}. */ - protected final IRichSpout spout; - /** The wrapper of the given Flink collector. */ - protected StormSpoutCollector<OUT> collector; - /** Indicates, if the source is still running or was canceled. */ - protected volatile boolean isRunning = true; - /** The original Storm topology. */ - protected StormTopology stormTopology; - - /** - * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such - * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to - * {@link Tuple25} depending on the spout's declared number of attributes. - * - * @param spout - * The Storm {@link IRichSpout spout} to be used. - * @throws IllegalArgumentException - * If the number of declared output attributes is not with range [0;25]. - */ - public AbstractStormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException { - this(spout, null); - } - - /** - * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such - * that it can be used within a Flink streaming program. The output type can be any type if parameter - * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is - * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared - * number of attributes. - * - * @param spout - * The Storm {@link IRichSpout spout} to be used. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [0;25]. - */ - public AbstractStormSpoutWrapper(final IRichSpout spout, - final Collection<String> rawOutputs) - throws IllegalArgumentException { - this.spout = spout; - this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs); - } - - /** - * Sets the original Storm topology. - * - * @param stormTopology - * The original Storm topology. - */ - public void setStormTopology(StormTopology stormTopology) { - this.stormTopology = stormTopology; - } - - @Override - public final void run(final SourceContext<OUT> ctx) throws Exception { - this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx); - - GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); - StormConfig stormConfig = new StormConfig(); - - if (config != null) { - if (config instanceof StormConfig) { - stormConfig = (StormConfig) config; - } else { - stormConfig.putAll(config.toMap()); - } - } - - this.spout.open(stormConfig, - StormWrapperSetupHelper.createTopologyContext( - (StreamingRuntimeContext) super.getRuntimeContext(), - this.spout, - this.stormTopology, - null), - new SpoutOutputCollector(this.collector)); - this.spout.activate(); - this.execute(); - } - - /** - * Needs to be implemented to call the given Spout's {@link IRichSpout#nextTuple() nextTuple()} method. This method - * might use a {@code while(true)}-loop to emit an infinite number of tuples. - */ - protected abstract void execute(); - - /** - * {@inheritDoc} - * <p/> - * Sets the {@link #isRunning} flag to {@code false}. - */ - @Override - public void cancel() { - this.isRunning = false; - } - - @Override - public void close() throws Exception { - this.spout.close(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java deleted file mode 100644 index f499ecc..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wrappers; - -import java.util.Collection; - -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple25; -import org.apache.flink.stormcompatibility.util.FiniteStormSpout; - -import com.google.common.collect.Sets; - -/** - * A {@link FiniteStormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped - * {@link FiniteStormSpout}'s {@link FiniteStormSpout#nextTuple()} method until {@link - * FiniteStormSpout#reachedEnd()} is true. - */ -public class FiniteStormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> { - private static final long serialVersionUID = -218340336648247605L; - - private FiniteStormSpout finiteSpout; - - /** - * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such - * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to - * {@link Tuple25} depending on the spout's declared number of attributes. - * - * @param spout - * The Storm {@link FiniteStormSpout spout} to be used. - * @throws IllegalArgumentException - * If the number of declared output attributes is not with range [0;25]. - */ - public FiniteStormSpoutWrapper(FiniteStormSpout spout) - throws IllegalArgumentException { - super(spout); - this.finiteSpout = spout; - } - - /** - * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such - * that it can be used within a Flink streaming program. The output type can be any type if parameter - * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is - * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared - * number of attributes. - * - * @param spout - * The Storm {@link FiniteStormSpout spout} to be used. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [0;25]. - */ - public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final String[] rawOutputs) - throws IllegalArgumentException { - this(spout, Sets.newHashSet(rawOutputs)); - } - - /** - * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such - * that it can be used within a Flink streaming program. The output type can be any type if parameter - * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is - * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared - * number of attributes. - * - * @param spout - * The Storm {@link FiniteStormSpout spout} to be used. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [0;25]. - */ - public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final Collection<String> rawOutputs) - throws IllegalArgumentException { - super(spout, rawOutputs); - this.finiteSpout = spout; - } - - /** - * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link FiniteStormSpout#reachedEnd()} is true or - * {@link FiniteStormSpout#cancel()} is called. - */ - @Override - protected void execute() { - while (super.isRunning && !finiteSpout.reachedEnd()) { - finiteSpout.nextTuple(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java deleted file mode 100644 index 3cd27d4..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wrappers; - -import java.util.HashMap; - -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - -/** - * {@link SetupOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the output streams and - * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)} - * method. - */ -class SetupOutputFieldsDeclarer implements OutputFieldsDeclarer { - - /** The number of attributes for each declared stream by the wrapped operator. */ - HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>(); - - @Override - public void declare(final Fields fields) { - this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields); - } - - @Override - public void declare(final boolean direct, final Fields fields) { - this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields); - } - - @Override - public void declareStream(final String streamId, final Fields fields) { - this.declareStream(streamId, false, fields); - } - - @Override - public void declareStream(final String streamId, final boolean direct, final Fields fields) { - if (streamId == null) { - throw new IllegalArgumentException("Stream ID cannot be null."); - } - if (direct) { - throw new UnsupportedOperationException("Direct emit is not supported by Flink"); - } - - this.outputSchemas.put(streamId, fields.size()); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java deleted file mode 100644 index e810214..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wrappers; - -import backtype.storm.task.IOutputCollector; -import backtype.storm.tuple.Tuple; - -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.tuple.Tuple25; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.util.Collector; - -import java.util.Collection; -import java.util.HashMap; -import java.util.List; - -/** - * A {@link StormBoltCollector} is used by {@link StormBoltWrapper} to provided an Storm compatible - * output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples - * and emits them via the provide {@link Output} object. - */ -class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector { - - /** The Flink output Collector */ - private final Collector<OUT> flinkOutput; - - /** - * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink output object. If the - * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is - * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. - * - * @param numberOfAttributes - * The number of attributes of the emitted tuples per output stream. - * @param flinkOutput - * The Flink output object to be used. - * @throws UnsupportedOperationException - * if the specified number of attributes is greater than 25 - */ - public StormBoltCollector(final HashMap<String, Integer> numberOfAttributes, - final Collector<OUT> flinkOutput) throws UnsupportedOperationException { - super(numberOfAttributes); - assert (flinkOutput != null); - this.flinkOutput = flinkOutput; - } - - @Override - protected List<Integer> doEmit(final OUT flinkTuple) { - this.flinkOutput.collect(flinkTuple); - // TODO - return null; - } - - @Override - public void reportError(final Throwable error) { - // not sure, if Flink can support this - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) { - return this.tansformAndEmit(streamId, tuple); - } - - @Override - public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) { - throw new UnsupportedOperationException("Direct emit is not supported by Flink"); - } - - @Override - public void ack(final Tuple input) { - throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink"); - } - - @Override - public void fail(final Tuple input) { - throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink"); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java deleted file mode 100644 index 715d6df..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.stormcompatibility.wrappers; - -import java.util.Collection; -import java.util.HashMap; - -import backtype.storm.generated.StormTopology; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichBolt; -import backtype.storm.tuple.Fields; - -import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple25; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.stormcompatibility.util.SplitStreamType; -import org.apache.flink.stormcompatibility.util.StormConfig; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import com.google.common.collect.Sets; - -/** - * A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming - * program. It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the - * bolt can process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type - * {@code OUT} (see {@link AbstractStormCollector} for supported types).<br /> - * <br /> - * <strong>CAUTION: currently, only simple bolts are supported! (ie, bolts that do not use the Storm configuration - * <code>Map</code> or <code>TopologyContext</code> that is provided by the bolt's <code>open(..)</code> method. - * Furthermore, acking and failing of tuples as well as accessing tuple attributes by field names is not supported so - * far.</strong> - */ -public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> { - private static final long serialVersionUID = -4788589118464155835L; - - /** The wrapped Storm {@link IRichBolt bolt}. */ - private final IRichBolt bolt; - /** Number of attributes of the bolt's output tuples per stream. */ - private final HashMap<String, Integer> numberOfAttributes; - /** The schema (ie, ordered field names) of the input stream. */ - private final Fields inputSchema; - /** The original Storm topology. */ - protected StormTopology stormTopology; - - /** - * We have to use this because Operators must output - * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}. - */ - private TimestampedCollector<OUT> flinkCollector; - - /** - * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be - * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible - * for POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's - * declared number of attributes. - * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. - * @throws IllegalArgumentException - * If the number of declared output attributes is not with range [0;25]. - */ - public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException { - this(bolt, null, (Collection<String>) null); - } - - /** - * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be - * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types - * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on - * the bolt's declared number of attributes. - * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. - * @param inputSchema - * The schema (ie, ordered field names) of the input stream. - * @throws IllegalArgumentException - * If the number of declared output attributes is not with range [0;25]. - */ - public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema) - throws IllegalArgumentException { - this(bolt, inputSchema, (Collection<String>) null); - } - - /** - * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be - * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible - * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the - * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one - * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [1;25]. - */ - public StormBoltWrapper(final IRichBolt bolt, final String[] rawOutputs) - throws IllegalArgumentException { - this(bolt, null, Sets.newHashSet(rawOutputs)); - } - - /** - * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be - * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible - * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the - * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one - * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [1;25]. - */ - public StormBoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs) - throws IllegalArgumentException { - this(bolt, null, rawOutputs); - } - - /** - * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be - * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types - * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} - * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will - * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. - * @param inputSchema - * The schema (ie, ordered field names) of the input stream. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [0;25]. - */ - public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema, - final String[] rawOutputs) throws IllegalArgumentException { - this(bolt, inputSchema, Sets.newHashSet(rawOutputs)); - } - - /** - * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be - * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types - * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} - * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will - * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * - * @param bolt - * The Storm {@link IRichBolt bolt} to be used. - * @param inputSchema - * The schema (ie, ordered field names) of the input stream. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [0;25]. - */ - public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema, - final Collection<String> rawOutputs) throws IllegalArgumentException { - this.bolt = bolt; - this.inputSchema = inputSchema; - this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs); - } - - /** - * Sets the original Storm topology. - * - * @param stormTopology - * The original Storm topology. - */ - public void setStormTopology(StormTopology stormTopology) { - this.stormTopology = stormTopology; - } - - @Override - public void open(final Configuration parameters) throws Exception { - super.open(parameters); - - final TopologyContext topologyContext = StormWrapperSetupHelper.createTopologyContext( - super.runtimeContext, this.bolt, this.stormTopology, null); - flinkCollector = new TimestampedCollector<OUT>(output); - OutputCollector stormCollector = null; - - if (this.numberOfAttributes.size() > 0) { - stormCollector = new OutputCollector(new StormBoltCollector<OUT>( - this.numberOfAttributes, flinkCollector)); - } - - GlobalJobParameters config = super.executionConfig.getGlobalJobParameters(); - StormConfig stormConfig = new StormConfig(); - - if (config != null) { - if (config instanceof StormConfig) { - stormConfig = (StormConfig) config; - } else { - stormConfig.putAll(config.toMap()); - } - } - - this.bolt.prepare(stormConfig, topologyContext, stormCollector); - } - - @Override - public void dispose() { - this.bolt.cleanup(); - } - - @SuppressWarnings("unchecked") - @Override - public void processElement(final StreamRecord<IN> element) throws Exception { - flinkCollector.setTimestamp(element.getTimestamp()); - IN value = element.getValue(); - if (value instanceof SplitStreamType) { - this.bolt.execute(new StormTuple<IN>(((SplitStreamType<IN>) value).value, - inputSchema)); - } else { - this.bolt.execute(new StormTuple<IN>(value, inputSchema)); - } - } - - @Override - public void processWatermark(Watermark mark) throws Exception { - output.emitWatermark(mark); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java deleted file mode 100644 index 45eb56c..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wrappers; - -import java.util.Collection; - -import backtype.storm.topology.IRichSpout; - -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple25; - -import com.google.common.collect.Sets; - -/** - * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls {@link IRichSpout#nextTuple() - * nextTuple()} for finite number of times before - * {@link #run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext)} returns. The number of - * {@code nextTuple()} calls can be specified as a certain number of invocations or can be undefined. In the undefined - * case, the {@code run(...)} method return if no record was emitted to the output collector for the first time. - */ -public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> { - private static final long serialVersionUID = 3883246587044801286L; - - /** The number of {@link IRichSpout#nextTuple()} calls */ - private int numberOfInvocations; - - /** - * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} - * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The - * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of - * attributes. - * - * @param spout - * The Storm {@link IRichSpout spout} to be used. - * @throws IllegalArgumentException - * If the number of declared output attributes is not with range [0;25]. - */ - public StormFiniteSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException { - this(spout, (Collection<String>) null, -1); - } - - /** - * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} - * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be one - * of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of attributes. - * - * @param spout - * The Storm {@link IRichSpout spout} to be used. - * @param numberOfInvocations - * The number of calls to {@link IRichSpout#nextTuple()}. - * @throws IllegalArgumentException - * If the number of declared output attributes is not with range [0;25]. - */ - public StormFiniteSpoutWrapper(final IRichSpout spout, final int numberOfInvocations) - throws IllegalArgumentException { - this(spout, (Collection<String>) null, numberOfInvocations); - } - - /** - * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} - * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The - * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared - * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to - * {@link Tuple25} depending on the spout's declared number of attributes. - * - * @param spout - * The Storm {@link IRichSpout spout} to be used. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [0;25]. - */ - public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] rawOutputs) - throws IllegalArgumentException { - this(spout, Sets.newHashSet(rawOutputs), -1); - } - - /** - * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} - * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The - * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared - * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to - * {@link Tuple25} depending on the spout's declared number of attributes. - * - * @param spout - * The Storm {@link IRichSpout spout} to be used. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [0;25]. - */ - public StormFiniteSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs) - throws IllegalArgumentException { - this(spout, rawOutputs, -1); - } - - /** - * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} - * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any - * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If - * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on - * the spout's declared number of attributes. - * - * @param spout - * The Storm {@link IRichSpout spout} to be used. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @param numberOfInvocations - * The number of calls to {@link IRichSpout#nextTuple()}. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [0;25]. - */ - public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] rawOutputs, - final int numberOfInvocations) throws IllegalArgumentException { - super(spout, Sets.newHashSet(rawOutputs)); - this.numberOfInvocations = numberOfInvocations; - } - - /** - * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} - * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any - * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If - * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on - * the spout's declared number of attributes. - * - * @param spout - * The Storm {@link IRichSpout spout} to be used. - * @param rawOutputs - * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be - * of a raw type. - * @param numberOfInvocations - * The number of calls to {@link IRichSpout#nextTuple()}. - * @throws IllegalArgumentException - * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if - * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range - * [0;25]. - */ - public StormFiniteSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs, - final int numberOfInvocations) throws IllegalArgumentException { - super(spout, rawOutputs); - this.numberOfInvocations = numberOfInvocations; - } - - /** - * Calls {@link IRichSpout#nextTuple()} for the given number of times. - */ - @Override - protected void execute() { - if (this.numberOfInvocations >= 0) { - while ((--this.numberOfInvocations >= 0) && super.isRunning) { - super.spout.nextTuple(); - } - } else { - do { - super.collector.tupleEmitted = false; - super.spout.nextTuple(); - } while (super.collector.tupleEmitted && super.isRunning); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java deleted file mode 100644 index 5a20056..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wrappers; - -import backtype.storm.spout.ISpoutOutputCollector; - -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.tuple.Tuple25; -import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; - -import java.util.HashMap; -import java.util.List; - -/** - * A {@link StormSpoutCollector} is used by {@link AbstractStormSpoutWrapper} to provided an Storm - * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into - * Flink tuples and emits them via the provide {@link SourceContext} object. - */ -class StormSpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector { - - /** The Flink source context object */ - private final SourceContext<OUT> flinkContext; - - /** - * Instantiates a new {@link StormSpoutCollector} that emits Flink tuples to the given Flink source context. If the - * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0 - * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. - * - * @param numberOfAttributes - * The number of attributes of the emitted tuples. - * @param flinkContext - * The Flink source context to be used. - * @throws UnsupportedOperationException - * if the specified number of attributes is greater than 25 - */ - public StormSpoutCollector(final HashMap<String, Integer> numberOfAttributes, - final SourceContext<OUT> flinkContext) throws UnsupportedOperationException { - super(numberOfAttributes); - assert (flinkContext != null); - this.flinkContext = flinkContext; - } - - @Override - protected List<Integer> doEmit(final OUT flinkTuple) { - this.flinkContext.collect(flinkTuple); - // TODO - return null; - } - - @Override - public void reportError(final Throwable error) { - // not sure, if Flink can support this - throw new UnsupportedOperationException("Not implemented yet"); - } - - @Override - public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) { - return this.tansformAndEmit(streamId, tuple); - } - - - @Override - public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) { - throw new UnsupportedOperationException("Direct emit is not supported by Flink"); - } - -}