This is an automated email from the ASF dual-hosted git repository.

xccui pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new 9f1c12c  [FLINK-9289] [Dataset] Parallelism of generated operators 
should have the max parallelism of input
9f1c12c is described below

commit 9f1c12c10c3eb7b302f0688ed1b60fd08942dc03
Author: Xingcan Cui <xingc...@gmail.com>
AuthorDate: Sun May 13 20:20:36 2018 +0800

    [FLINK-9289] [Dataset] Parallelism of generated operators should have the 
max parallelism of input
    
    This closes #6003.
---
 .../flink/api/java/operators/KeyFunctions.java     |  27 +++
 .../flink/api/java/operators/UnionOperator.java    |   7 +
 .../translation/UnionTranslationTest.java          | 182 +++++++++++++++++++++
 .../flink/graph/library/linkanalysis/PageRank.java |   1 -
 .../apache/flink/python/api/PythonPlanBinder.java  |   2 +-
 5 files changed, 217 insertions(+), 2 deletions(-)

diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
index f6336cd..3e7a552 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/KeyFunctions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.Union;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
@@ -43,6 +44,19 @@ public class KeyFunctions {
                        org.apache.flink.api.common.operators.Operator<T> input,
                        SelectorFunctionKeys<T, K> key) {
 
+               if (input instanceof Union) {
+                       // if input is a union, we apply the key extractors 
recursively to all inputs
+                       org.apache.flink.api.common.operators.Operator<T> 
firstInput = ((Union) input).getFirstInput();
+                       org.apache.flink.api.common.operators.Operator<T> 
secondInput = ((Union) input).getSecondInput();
+
+                       
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> firstInputWithKey =
+                                       appendKeyExtractor(firstInput, key);
+                       
org.apache.flink.api.common.operators.Operator<Tuple2<K, T>> secondInputWithKey 
=
+                                       appendKeyExtractor(secondInput, key);
+
+                       return new Union(firstInputWithKey, secondInputWithKey, 
input.getName());
+               }
+
                TypeInformation<T> inputType = key.getInputType();
                TypeInformation<Tuple2<K, T>> typeInfoWithKey = 
createTypeWithKey(key);
                KeyExtractingMapper<T, K> extractor = new 
KeyExtractingMapper(key.getKeyExtractor());
@@ -66,6 +80,19 @@ public class KeyFunctions {
                        SelectorFunctionKeys<T, K1> key1,
                        SelectorFunctionKeys<T, K2> key2) {
 
+               if (input instanceof Union) {
+                       // if input is a union, we apply the key extractors 
recursively to all inputs
+                       org.apache.flink.api.common.operators.Operator<T> 
firstInput = ((Union) input).getFirstInput();
+                       org.apache.flink.api.common.operators.Operator<T> 
secondInput = ((Union) input).getSecondInput();
+
+                       
org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> 
firstInputWithKey =
+                                       appendKeyExtractor(firstInput, key1, 
key2);
+                       
org.apache.flink.api.common.operators.Operator<Tuple3<K1, K2, T>> 
secondInputWithKey =
+                                       appendKeyExtractor(secondInput, key1, 
key2);
+
+                       return new Union(firstInputWithKey, secondInputWithKey, 
input.getName());
+               }
+
                TypeInformation<T> inputType = key1.getInputType();
                TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = 
createTypeWithKey(key1, key2);
                TwoKeyExtractingMapper<T, K1, K2> extractor =
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
index 0da5e01..7d3c0d6 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnionOperator.java
@@ -62,4 +62,11 @@ public class UnionOperator<T> extends TwoInputOperator<T, T, 
T, UnionOperator<T>
        protected Union<T> translateToDataFlow(Operator<T> input1, Operator<T> 
input2) {
                return new Union<T>(input1, input2, unionLocationName);
        }
+
+       @Override
+       public UnionOperator<T> setParallelism(int parallelism) {
+               // Union is not translated to an independent operator but 
executed by multiplexing
+               // its input on the following operator. Hence, the parallelism 
of a Union cannot be set.
+               throw new UnsupportedOperationException("Cannot set the 
parallelism for Union.");
+       }
 }
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
new file mode 100644
index 0000000..d393109
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/UnionTranslationTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of union operation.
+ */
+@SuppressWarnings("serial")
+public class UnionTranslationTest {
+
+       @Test
+       public void translateUnion2Group() {
+               try {
+                       final int parallelism = 4;
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
dataset1 = getSourceDataSet(env, 3);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
dataset2 = getSourceDataSet(env, 2);
+
+                       dataset1.union(dataset2)
+                                       .groupBy(new KeySelector<Tuple3<Double, 
StringValue, LongValue>, String>() {
+                                               @Override
+                                               public String 
getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception {
+                                                       return "";
+                                               }
+                                       })
+                                       .reduceGroup(new 
GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>() {
+                                               @Override
+                                               public void 
reduce(Iterable<Tuple3<Double, StringValue, LongValue>> values, 
Collector<String> out) throws Exception {
+                                               }
+                                       })
+                                       .returns(String.class)
+                                       .output(new DiscardingOutputFormat<>());
+
+                       Plan p = env.createProgramPlan();
+
+                       // The plan should look like the following one.
+                       //
+                       // DataSet1(3) - MapOperator(3)-+
+                       //                                  |- Union(-1) - 
SingleInputOperator - Sink
+                       // DataSet2(2) - MapOperator(2)-+
+
+                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
+                       Union unionOperator = (Union) ((SingleInputOperator) 
sink.getInput()).getInput();
+
+                       // The key mappers should be added to both of the two 
input streams for union.
+                       assertTrue(unionOperator.getFirstInput() instanceof 
MapOperatorBase<?, ?, ?>);
+                       assertTrue(unionOperator.getSecondInput() instanceof 
MapOperatorBase<?, ?, ?>);
+
+                       // The parallelisms of the key mappers should be equal 
to those of their inputs.
+                       
assertEquals(unionOperator.getFirstInput().getParallelism(), 3);
+                       
assertEquals(unionOperator.getSecondInput().getParallelism(), 2);
+
+                       // The union should always have the default parallelism.
+                       assertEquals(unionOperator.getParallelism(), 
ExecutionConfig.PARALLELISM_DEFAULT);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test caused an error: " + e.getMessage());
+               }
+       }
+
+       @Test
+       public void translateUnion3SortedGroup() {
+               try {
+                       final int parallelism = 4;
+                       ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(parallelism);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
dataset1 = getSourceDataSet(env, 2);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
dataset2 = getSourceDataSet(env, 3);
+
+                       DataSet<Tuple3<Double, StringValue, LongValue>> 
dataset3 = getSourceDataSet(env, -1);
+
+                       dataset1.union(dataset2).union(dataset3)
+                                       .groupBy(new KeySelector<Tuple3<Double, 
StringValue, LongValue>, String>() {
+                                               @Override
+                                               public String 
getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception {
+                                                       return "";
+                                               }
+                                       })
+                                       .sortGroup(new 
KeySelector<Tuple3<Double, StringValue, LongValue>, String>() {
+                                               @Override
+                                               public String 
getKey(Tuple3<Double, StringValue, LongValue> value) throws Exception {
+                                                       return "";
+                                               }
+                                       }, Order.ASCENDING)
+                                       .reduceGroup(new 
GroupReduceFunction<Tuple3<Double, StringValue, LongValue>, String>() {
+                                               @Override
+                                               public void 
reduce(Iterable<Tuple3<Double, StringValue, LongValue>> values, 
Collector<String> out) throws Exception {
+                                               }
+                                       })
+                                       .returns(String.class)
+                                       .output(new DiscardingOutputFormat<>());
+
+                       Plan p = env.createProgramPlan();
+
+                       // The plan should look like the following one.
+                       //
+                       // DataSet1(2) - MapOperator(2)-+
+                       //                                  |- Union(-1) -+
+                       // DataSet2(3) - MapOperator(3)-+             |- 
Union(-1) - SingleInputOperator - Sink
+                       //                                            |
+                       //             DataSet3(-1) - MapOperator(-1)-+
+
+                       GenericDataSinkBase<?> sink = 
p.getDataSinks().iterator().next();
+                       Union secondUnionOperator = (Union) 
((SingleInputOperator) sink.getInput()).getInput();
+
+                       // The first input of the second union should be the 
first union.
+                       Union firstUnionOperator = (Union) 
secondUnionOperator.getFirstInput();
+
+                       // The key mapper should be added to the second input 
stream of the second union.
+                       assertTrue(secondUnionOperator.getSecondInput() 
instanceof MapOperatorBase<?, ?, ?>);
+
+                       // The key mappers should be added to both of the two 
input streams for the first union.
+                       assertTrue(firstUnionOperator.getFirstInput() 
instanceof MapOperatorBase<?, ?, ?>);
+                       assertTrue(firstUnionOperator.getSecondInput() 
instanceof MapOperatorBase<?, ?, ?>);
+
+                       // The parallelisms of the key mappers should be equal 
to those of their inputs.
+                       
assertEquals(firstUnionOperator.getFirstInput().getParallelism(), 2);
+                       
assertEquals(firstUnionOperator.getSecondInput().getParallelism(), 3);
+                       
assertEquals(secondUnionOperator.getSecondInput().getParallelism(), -1);
+
+                       // The union should always have the default parallelism.
+                       assertEquals(secondUnionOperator.getParallelism(), 
ExecutionConfig.PARALLELISM_DEFAULT);
+                       assertEquals(firstUnionOperator.getParallelism(), 
ExecutionConfig.PARALLELISM_DEFAULT);
+               }
+               catch (Exception e) {
+                       System.err.println(e.getMessage());
+                       e.printStackTrace();
+                       fail("Test caused an error: " + e.getMessage());
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private static DataSet<Tuple3<Double, StringValue, LongValue>> 
getSourceDataSet(ExecutionEnvironment env, int parallelism) {
+               return env
+                               .fromElements(new Tuple3<>(0.0, new 
StringValue(""), new LongValue(1L)))
+                               .setParallelism(parallelism);
+       }
+}
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index d259fac..932ad78 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -235,7 +235,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                // s, adjusted pagerank(s)
                DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
                        .union(sourceVertices)
-                               .setParallelism(parallelism)
                                .name("Union with source vertices")
                        .map(new AdjustScores<>(dampingFactor))
                                .withBroadcastSet(sumOfScores, SUM_OF_SCORES)
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 106ac7a..44c79de 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -530,7 +530,7 @@ public class PythonPlanBinder {
        private <IN> void createUnionOperation(PythonOperationInfo info) {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
                DataSet<IN> op2 = sets.getDataSet(info.otherID);
-               sets.add(info.setID, 
op1.union(op2).setParallelism(info.parallelism).name("Union"));
+               sets.add(info.setID, op1.union(op2).name("Union"));
        }
 
        private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo 
info, TypeInformation<OUT> type) {

Reply via email to