This is an automated email from the ASF dual-hosted git repository. xccui pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push: new 9f1c12c [FLINK-9289] [Dataset] Parallelism of generated operators should have the max parallelism of input 9f1c12c is described below commit 9f1c12c10c3eb7b302f0688ed1b60fd08942dc03 Author: Xingcan Cui <xingc...@gmail.com> AuthorDate: Sun May 13 20:20:36 2018 +0800 [FLINK-9289] [Dataset] Parallelism of generated operators should have the max parallelism of input This closes #6003. --- .../flink/api/java/operators/KeyFunctions.java | 27 +++ .../flink/api/java/operators/UnionOperator.java | 7 + .../translation/UnionTranslationTest.java | 182 +++++++++++++++++++++ .../flink/graph/library/linkanalysis/PageRank.java | 1 - .../apache/flink/python/api/PythonPlanBinder.java | 2 +- 5 files changed, 217 insertions(+), 2 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java index f6336cd..3e7a552 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys; import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.operators.Union; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.translation.KeyExtractingMapper; @@ -43,6 +44,19 @@ public class KeyFunctions { org.apache.flink.api.common.operators.Operator<T> input, SelectorFunctionKeys<T, K> key) { + if (input instanceof Union) { + // if input is a union, we apply the key extractors recursively to all inputs + org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union) input).getFirstInput(); + org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union) input).getSecondInput(); + + org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> firstInputWithKey = + appendKeyExtractor(firstInput, key); + org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> secondInputWithKey = + appendKeyExtractor(secondInput, key); + + return new Union(firstInputWithKey, secondInputWithKey, input.getName()); + } + TypeInformation<T> inputType = key.getInputType(); TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key); KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor()); @@ -66,6 +80,19 @@ public class KeyFunctions { SelectorFunctionKeys<T, K1> key1, SelectorFunctionKeys<T, K2> key2) { + if (input instanceof Union) { + // if input is a union, we apply the key extractors recursively to all inputs + org.apache.flink.api.common.operators.Operator<T> firstInput = ((Union) input).getFirstInput(); + org.apache.flink.api.common.operators.Operator<T> secondInput = ((Union) input).getSecondInput(); + + org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> firstInputWithKey = + appendKeyExtractor(firstInput, key1, key2); + org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> secondInputWithKey = + appendKeyExtractor(secondInput, key1, key2); + + return new Union(firstInputWithKey, secondInputWithKey, input.getName()); + } + TypeInformation<T> inputType = key1.getInputType(); TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2); TwoKeyExtractingMapper<T, K1, K2> extractor = diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java index 0da5e01..7d3c0d6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java @@ -62,4 +62,11 @@ public class UnionOperator<T> extends TwoInputOperator<T, T, T, UnionOperator<T> protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T> input2) { return new Union<T>(input1, input2, unionLocationName); } + + @Override + public UnionOperator<T> setParallelism(int parallelism) { + // Union is not translated to an independent operator but executed by multiplexing + // its input on the following operator. Hence, the parallelism of a Union cannot be set. + throw new UnsupportedOperationException("Cannot set the parallelism for Union."); + } } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java new file mode 100644 index 0000000..d393109 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operators.translation; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.GenericDataSinkBase; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.SingleInputOperator; +import org.apache.flink.api.common.operators.Union; +import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.StringValue; +import org.apache.flink.util.Collector; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for translation of union operation. + */ +@SuppressWarnings("serial") +public class UnionTranslationTest { + + @Test + public void translateUnion2Group() { + try { + final int parallelism = 4; + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism); + + DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 3); + + DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 2); + + dataset1.union(dataset2) + .groupBy(new KeySelector<Tuple3<Double, StringValue, LongValue>, String>() { + @Override + public String getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception { + return ""; + } + }) + .reduceGroup(new GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>() { + @Override + public void reduce(Iterable<Tuple3<Double, StringValue, LongValue>> values, Collector<String> out) throws Exception { + } + }) + .returns(String.class) + .output(new DiscardingOutputFormat<>()); + + Plan p = env.createProgramPlan(); + + // The plan should look like the following one. + // + // DataSet1(3) - MapOperator(3)-+ + // |- Union(-1) - SingleInputOperator - Sink + // DataSet2(2) - MapOperator(2)-+ + + GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); + Union unionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput(); + + // The key mappers should be added to both of the two input streams for union. + assertTrue(unionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>); + assertTrue(unionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>); + + // The parallelisms of the key mappers should be equal to those of their inputs. + assertEquals(unionOperator.getFirstInput().getParallelism(), 3); + assertEquals(unionOperator.getSecondInput().getParallelism(), 2); + + // The union should always have the default parallelism. + assertEquals(unionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test caused an error: " + e.getMessage()); + } + } + + @Test + public void translateUnion3SortedGroup() { + try { + final int parallelism = 4; + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism); + + DataSet<Tuple3<Double, StringValue, LongValue>> dataset1 = getSourceDataSet(env, 2); + + DataSet<Tuple3<Double, StringValue, LongValue>> dataset2 = getSourceDataSet(env, 3); + + DataSet<Tuple3<Double, StringValue, LongValue>> dataset3 = getSourceDataSet(env, -1); + + dataset1.union(dataset2).union(dataset3) + .groupBy(new KeySelector<Tuple3<Double, StringValue, LongValue>, String>() { + @Override + public String getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception { + return ""; + } + }) + .sortGroup(new KeySelector<Tuple3<Double, StringValue, LongValue>, String>() { + @Override + public String getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception { + return ""; + } + }, Order.ASCENDING) + .reduceGroup(new GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>() { + @Override + public void reduce(Iterable<Tuple3<Double, StringValue, LongValue>> values, Collector<String> out) throws Exception { + } + }) + .returns(String.class) + .output(new DiscardingOutputFormat<>()); + + Plan p = env.createProgramPlan(); + + // The plan should look like the following one. + // + // DataSet1(2) - MapOperator(2)-+ + // |- Union(-1) -+ + // DataSet2(3) - MapOperator(3)-+ |- Union(-1) - SingleInputOperator - Sink + // | + // DataSet3(-1) - MapOperator(-1)-+ + + GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next(); + Union secondUnionOperator = (Union) ((SingleInputOperator) sink.getInput()).getInput(); + + // The first input of the second union should be the first union. + Union firstUnionOperator = (Union) secondUnionOperator.getFirstInput(); + + // The key mapper should be added to the second input stream of the second union. + assertTrue(secondUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>); + + // The key mappers should be added to both of the two input streams for the first union. + assertTrue(firstUnionOperator.getFirstInput() instanceof MapOperatorBase<?, ?, ?>); + assertTrue(firstUnionOperator.getSecondInput() instanceof MapOperatorBase<?, ?, ?>); + + // The parallelisms of the key mappers should be equal to those of their inputs. + assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2); + assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3); + assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1); + + // The union should always have the default parallelism. + assertEquals(secondUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT); + assertEquals(firstUnionOperator.getParallelism(), ExecutionConfig.PARALLELISM_DEFAULT); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Test caused an error: " + e.getMessage()); + } + } + + @SuppressWarnings("unchecked") + private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env, int parallelism) { + return env + .fromElements(new Tuple3<>(0.0, new StringValue(""), new LongValue(1L))) + .setParallelism(parallelism); + } +} diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java index d259fac..932ad78 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java @@ -235,7 +235,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { // s, adjusted pagerank(s) DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores .union(sourceVertices) - .setParallelism(parallelism) .name("Union with source vertices") .map(new AdjustScores<>(dampingFactor)) .withBroadcastSet(sumOfScores, SUM_OF_SCORES) diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 106ac7a..44c79de 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -530,7 +530,7 @@ public class PythonPlanBinder { private <IN> void createUnionOperation(PythonOperationInfo info) { DataSet<IN> op1 = sets.getDataSet(info.parentID); DataSet<IN> op2 = sets.getDataSet(info.otherID); - sets.add(info.setID, op1.union(op2).setParallelism(info.parallelism).name("Union")); + sets.add(info.setID, op1.union(op2).name("Union")); } private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT> type) {