[streaming] Replaced partitionBy with groupBy + re-added global partitioning
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10a81862 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10a81862 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10a81862 Branch: refs/heads/master Commit: 10a81862562d6e87300cdb21c7cd719cddeee083 Parents: b22406a Author: Gyula Fora <[email protected]> Authored: Wed Jan 7 10:24:00 2015 +0100 Committer: mbalassi <[email protected]> Committed: Thu Jan 8 13:35:28 2015 +0100 ---------------------------------------------------------------------- docs/streaming_guide.md | 6 +- .../connectors/twitter/TwitterLocal.java | 98 -------------------- .../connectors/twitter/TwitterTopology.java | 92 ++++++++++++++++++ .../streaming/api/datastream/DataStream.java | 67 ++++++------- .../datastream/SingleOutputStreamOperator.java | 16 ---- .../examples/twitter/TwitterStream.java | 2 +- .../windowing/MultiplePoliciesExample.java | 9 +- .../flink/streaming/api/scala/DataStream.scala | 40 ++------ 8 files changed, 137 insertions(+), 193 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/docs/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index c7e7060..7808b9e 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -162,11 +162,11 @@ Usage: `dataStream.shuffle()` * *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution. Usage: `dataStream.distribute()` * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. The user can define keys by field positions (for tuple and array types), field expressions (for Pojo types) and custom keys using the `KeySelector` interface. -Usage: `dataStream.partitionBy(keys)` +Usage: `dataStream.groupBy(keys)` * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator. Usage: `dataStream.broadcast()` - * *Global*: All data points end up at the same operator instance. To achieve this use the parallelism setting of the corresponding operator. -Usage: `operator.setParallelism(1)` + * *Global*: All data are sent to the first instance of the next processing operator. Use this option with care to avoid serious performance bottlenecks. +Usage: `dataStream.global()` ### Sources http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java deleted file mode 100644 index 3058047..0000000 --- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterLocal.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.twitter; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; -import org.apache.flink.util.Collector; -import org.apache.sling.commons.json.JSONException; - -/** - * This program demonstrate the use of TwitterSource. - * Its aim is to count the frequency of the languages of tweets - */ -public class TwitterLocal { - - private static final int PARALLELISM = 1; - private static final int SOURCE_PARALLELISM = 1; - private static final int NUMBEROFTWEETS = 100; - - /** - * FlatMapFunction to determine the language of tweets if possible - */ - public static class SelectLanguageFlatMap extends - JSONParseFlatMap<String, String> { - - private static final long serialVersionUID = 1L; - - /** - * Select the language from the incoming JSON text - */ - @Override - public void flatMap(String value, Collector<String> out) throws Exception { - try{ - out.collect(getString(value, "lang")); - } - catch (JSONException e){ - out.collect(""); - } - } - - } - - public static void main(String[] args) throws Exception { - - String path = new String(); - - if (args != null && args.length == 1) { - path = args[0]; - } else { - System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>"); - return; - } - - StreamExecutionEnvironment env = StreamExecutionEnvironment - .createLocalEnvironment(PARALLELISM); - - DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS)) - .setParallelism(SOURCE_PARALLELISM); - - - DataStream<Tuple2<String, Integer>> dataStream = streamSource - .flatMap(new SelectLanguageFlatMap()) - .partitionBy(0) - .map(new MapFunction<String, Tuple2<String, Integer>>() { - - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<String, Integer> map(String value) throws Exception { - return new Tuple2<String, Integer>(value, 1); - } - }) - .groupBy(0) - .sum(1); - - dataStream.print(); - - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java new file mode 100644 index 0000000..4bc6df0 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.twitter; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.json.JSONParseFlatMap; +import org.apache.flink.util.Collector; +import org.apache.sling.commons.json.JSONException; + +/** + * This program demonstrate the use of TwitterSource. + * Its aim is to count the frequency of the languages of tweets + */ +public class TwitterTopology { + + private static final int NUMBEROFTWEETS = 100; + + /** + * FlatMapFunction to determine the language of tweets if possible + */ + public static class SelectLanguageFlatMap extends + JSONParseFlatMap<String, String> { + + private static final long serialVersionUID = 1L; + + /** + * Select the language from the incoming JSON text + */ + @Override + public void flatMap(String value, Collector<String> out) throws Exception { + try{ + out.collect(getString(value, "lang")); + } + catch (JSONException e){ + out.collect(""); + } + } + + } + + public static void main(String[] args) throws Exception { + + String path = new String(); + + if (args != null && args.length == 1) { + path = args[0]; + } else { + System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>"); + return; + } + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS)); + + + DataStream<Tuple2<String, Integer>> dataStream = streamSource + .flatMap(new SelectLanguageFlatMap()) + .map(new MapFunction<String, Tuple2<String, Integer>>() { + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<String, Integer> map(String value) throws Exception { + return new Tuple2<String, Integer>(value, 1); + } + }) + .groupBy(0) + .sum(1); + + dataStream.print(); + + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index e969647..8e21218 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -67,6 +67,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; import org.apache.flink.streaming.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.partitioner.DistributePartitioner; import org.apache.flink.streaming.partitioner.FieldsPartitioner; +import org.apache.flink.streaming.partitioner.GlobalPartitioner; import org.apache.flink.streaming.partitioner.ShufflePartitioner; import org.apache.flink.streaming.partitioner.StreamPartitioner; import org.apache.flink.streaming.util.keys.KeySelectorUtil; @@ -239,7 +240,9 @@ public class DataStream<OUT> { /** * Groups the elements of a {@link DataStream} by the given key positions to * be used with grouped operators like - * {@link GroupedDataStream#reduce(ReduceFunction)} + * {@link GroupedDataStream#reduce(ReduceFunction)}</p> This operator also + * affects the partitioning of the stream, by forcing values with the same + * key to go to the same processing instance. * * @param fields * The position of the fields on which the {@link DataStream} @@ -259,7 +262,9 @@ public class DataStream<OUT> { * is either the name of a public field or a getter method with parentheses * of the {@link DataStream}S underlying type. A dot can be used to drill * down into objects, as in {@code "field1.getInnerField2()" }. This method - * returns an {@link GroupedDataStream}. + * returns an {@link GroupedDataStream}.</p> This operator also affects the + * partitioning of the stream, by forcing values with the same key to go to + * the same processing instance. * * @param fields * One or more field expressions on which the DataStream will be @@ -275,7 +280,10 @@ public class DataStream<OUT> { /** * Groups the elements of a {@link DataStream} by the key extracted by the * {@link KeySelector} to be used with grouped operators like - * {@link GroupedDataStream#reduce(ReduceFunction)} + * {@link GroupedDataStream#reduce(ReduceFunction)}. + * <p/> + * This operator also affects the partitioning of the stream, by forcing + * values with the same key to go to the same processing instance. * * @param keySelector * The {@link KeySelector} that will be used to extract keys for @@ -293,42 +301,6 @@ public class DataStream<OUT> { /** * Sets the partitioning of the {@link DataStream} so that the output is - * partitioned by the selected fields. This setting only effects the how the - * outputs will be distributed between the parallel instances of the next - * processing operator. - * - * @param fields - * The fields to partition by. - * @return The DataStream with fields partitioning set. - */ - public DataStream<OUT> partitionBy(int... fields) { - if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) { - return partitionBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields)); - } else { - return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType())); - } - } - - /** - * Sets the partitioning of the {@link DataStream} so that the output is - * partitioned by the given field expressions. This setting only effects the - * how the outputs will be distributed between the parallel instances of the - * next processing operator. - * - * @param fields - * The fields expressions to partition by. - * @return The DataStream with fields partitioning set. - */ - public DataStream<OUT> partitionBy(String... fields) { - return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType())); - } - - private DataStream<OUT> partitionBy(Keys<OUT> keys) { - return partitionBy(KeySelectorUtil.getSelectorForKeys(keys, getType())); - } - - /** - * Sets the partitioning of the {@link DataStream} so that the output is * partitioned using the given {@link KeySelector}. This setting only * effects the how the outputs will be distributed between the parallel * instances of the next processing operator. @@ -336,7 +308,7 @@ public class DataStream<OUT> { * @param keySelector * @return */ - public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) { + protected DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) { return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector))); } @@ -390,6 +362,18 @@ public class DataStream<OUT> { } /** + * Sets the partitioning of the {@link DataStream} so that the output values + * all go to the first instance of the next processing operator. Use this + * setting with care since it might cause a serious performance bottleneck + * in the application. + * + * @return The DataStream with shuffle partitioning set. + */ + public DataStream<OUT> global() { + return setConnectionType(new GlobalPartitioner<OUT>()); + } + + /** * Initiates an iterative part of the program that feeds back data streams. * The iterative part needs to be closed by calling * {@link IterativeDataStream#closeWith(DataStream)}. The transformation of @@ -1007,7 +991,8 @@ public class DataStream<OUT> { protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitTime) { - DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null, true); + DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null, + true); jobGraphBuilder.addIterationHead(returnStream.getId(), this.getId(), iterationID, degreeOfParallelism, waitTime); http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 4b6edc0..aa85579 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -24,7 +24,6 @@ import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.collector.OutputSelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; @@ -151,21 +150,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato } @SuppressWarnings("unchecked") - public SingleOutputStreamOperator<OUT, O> partitionBy(int... keypositions) { - return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keypositions); - } - - @SuppressWarnings("unchecked") - public SingleOutputStreamOperator<OUT, O> partitionBy(String... fields) { - return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(fields); - } - - @SuppressWarnings("unchecked") - public SingleOutputStreamOperator<OUT, O> partitionBy(KeySelector<OUT, ?> keySelector) { - return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keySelector); - } - - @SuppressWarnings("unchecked") public SingleOutputStreamOperator<OUT, O> broadcast() { return (SingleOutputStreamOperator<OUT, O>) super.broadcast(); } http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java index 08aa5cb..1901475 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java @@ -71,7 +71,7 @@ public class TwitterStream { DataStream<Tuple2<String, Integer>> tweets = streamSource // selecting English tweets and splitting to words - .flatMap(new SelectEnglishAndTokenizeFlatMap()).partitionBy(0) + .flatMap(new SelectEnglishAndTokenizeFlatMap()) // returning (word, 1) .map(new MapFunction<String, Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java index 6f031c3..48783f2 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.examples.windowing; import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.source.SourceFunction; @@ -43,7 +44,13 @@ public class MultiplePoliciesExample { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> stream = env.addSource(new BasicSource()) - .groupBy(0) + .groupBy(new KeySelector<String, String>(){ + private static final long serialVersionUID = 1L; + @Override + public String getKey(String value) throws Exception { + return value; + } + }) .window(Count.of(2)) .every(Count.of(3), Count.of(5)) .reduceGroup(new Concat()); http://git-wip-us.apache.org/repos/asf/flink/blob/10a81862/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index ffe91cb..dfaa316 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -127,39 +127,6 @@ class DataStream[T](javaStream: JavaStream[T]) { } /** - * Sets the partitioning of the DataStream so that the output is - * partitioned by the selected fields. This setting only effects the how the outputs will be - * distributed between the parallel instances of the next processing operator. - * - */ - def partitionBy(fields: Int*): DataStream[T] = - javaStream.partitionBy(fields: _*) - - /** - * Sets the partitioning of the DataStream so that the output is - * partitioned by the selected fields. This setting only effects the how the outputs will be - * distributed between the parallel instances of the next processing operator. - * - */ - def partitionBy(firstField: String, otherFields: String*): DataStream[T] = - javaStream.partitionBy(firstField +: otherFields.toArray: _*) - - /** - * Sets the partitioning of the DataStream so that the output is - * partitioned by the given Key. This setting only effects the how the outputs will be - * distributed between the parallel instances of the next processing operator. - * - */ - def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = { - - val keyExtractor = new KeySelector[T, K] { - val cleanFun = clean(fun) - def getKey(in: T) = cleanFun(in) - } - javaStream.partitionBy(keyExtractor) - } - - /** * Sets the partitioning of the DataStream so that the output tuples * are broadcasted to every parallel instance of the next component. This * setting only effects the how the outputs will be distributed between the @@ -167,6 +134,13 @@ class DataStream[T](javaStream: JavaStream[T]) { * */ def broadcast: DataStream[T] = javaStream.broadcast() + + /** + * Sets the partitioning of the DataStream so that the output values all go to + * the first instance of the next processing operator. Use this setting with care + * since it might cause a serious performance bottlenect in the application. + */ + def global: DataStream[T] = javaStream.global() /** * Sets the partitioning of the DataStream so that the output tuples
