[FLINK-6229] [py] Rework setup of PythonPlanBinder

- make file/argument split more readable
- pass on Paths where applicable instead of recreating them every time
- rename PPB#clearPath to more appropriate deleteIfExists
- simplify PPB#copyFile
- simplify PPB#startPython
- use UUID#randomUUID() instead of Random.nextInt()
- remove several invalid exception declarations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ff9c99f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ff9c99f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ff9c99f

Branch: refs/heads/table-retraction
Commit: 5ff9c99ff193725c125f2b4c450411db97b6f3b4
Parents: bdcebfd
Author: zentol <ches...@apache.org>
Authored: Fri Mar 31 12:11:40 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Thu Apr 6 10:57:11 2017 +0200

----------------------------------------------------------------------
 .../flink/python/api/PythonPlanBinder.java      | 283 ++++++++-----------
 1 file changed, 124 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ff9c99f/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index b6181b4..2378d60 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -49,9 +49,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URISyntaxException;
 import java.util.Arrays;
-import java.util.Random;
+import java.util.UUID;
 
 import static 
org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE;
 import static 
org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE;
@@ -68,9 +67,6 @@ public class PythonPlanBinder {
 
        public static final String PLANBINDER_CONFIG_BCVAR_COUNT = 
"PLANBINDER_BCVAR_COUNT";
        public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = 
"PLANBINDER_BCVAR_";
-
-       private static final Random r = new Random();
-
        public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments";
 
        private static final String FLINK_PYTHON_REL_LOCAL_PATH = 
File.separator + "resources" + File.separator + "python";
@@ -80,10 +76,9 @@ public class PythonPlanBinder {
        private final String pythonLibraryPath;
 
        private final String tmpPlanFilesDir;
-       private String tmpDistributedDir;
+       private Path tmpDistributedDir;
 
        private final SetCache sets = new SetCache();
-       public ExecutionEnvironment env;
        private int currentEnvironmentID = 0;
        private PythonPlanStreamer streamer;
 
@@ -108,9 +103,9 @@ public class PythonPlanBinder {
                String configuredPlanTmpPath = 
globalConfig.getString(PythonOptions.PLAN_TMP_DIR);
                tmpPlanFilesDir = configuredPlanTmpPath != null
                        ? configuredPlanTmpPath
-                       : System.getProperty("java.io.tmpdir") + File.separator 
+ "flink_plan_" + r.nextInt();
+                       : System.getProperty("java.io.tmpdir") + File.separator 
+ "flink_plan_" + UUID.randomUUID();
                
-               tmpDistributedDir = 
globalConfig.getString(PythonOptions.DC_TMP_DIR);
+               tmpDistributedDir = new 
Path(globalConfig.getString(PythonOptions.DC_TMP_DIR));
                
                String flinkRootDir = System.getenv("FLINK_ROOT_DIR");
                pythonLibraryPath = flinkRootDir != null
@@ -131,121 +126,95 @@ public class PythonPlanBinder {
        void runPlan(String[] args) throws Exception {
                int split = 0;
                for (int x = 0; x < args.length; x++) {
-                       if (args[x].compareTo("-") == 0) {
+                       if (args[x].equals("-")) {
                                split = x;
+                               break;
                        }
                }
 
                try {
-                       String tmpPath = tmpPlanFilesDir;
-                       prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split 
== 0 ? args.length : split));
-                       startPython(tmpPath, Arrays.copyOfRange(args, split == 
0 ? args.length : split + 1, args.length));
+                       String planFile = args[0];
+                       String[] filesToCopy = Arrays.copyOfRange(args, 1, 
split == 0 ? args.length : split);
+                       String[] planArgumentsArray = Arrays.copyOfRange(args, 
split == 0 ? args.length : split + 1, args.length);
+
+                       StringBuilder planArgumentsBuilder = new 
StringBuilder();
+                       for (String arg : planArgumentsArray) {
+                               planArgumentsBuilder.append(" ").append(arg);
+                       }
+                       String planArguments = planArgumentsBuilder.toString();
+
+                       operatorConfig.setString(PLAN_ARGUMENTS_KEY, 
planArguments);
+
+                       // copy flink library, plan file and additional files 
to temporary location
+                       Path tmpPlanFilesPath = new Path(tmpPlanFilesDir);
+                       deleteIfExists(tmpPlanFilesPath);
+                       FileCache.copy(new Path(pythonLibraryPath), 
tmpPlanFilesPath, false);
+                       copyFile(new Path(planFile), tmpPlanFilesPath, 
FLINK_PYTHON_PLAN_NAME);
+                       for (String file : filesToCopy) {
+                               Path source = new Path(file);
+                               copyFile(source, tmpPlanFilesPath, 
source.getName());
+                       }
+
+                       // start python process
+                       streamer = new PythonPlanStreamer(operatorConfig);
+                       streamer.open(tmpPlanFilesDir, planArguments);
 
                        // Python process should terminate itself when all jobs 
have been run
                        while (streamer.preparePlanMode()) {
-                               receivePlan();
+                               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                               receivePlan(env);
+
+                               // upload files to remote FS and register on 
Distributed Cache
+                               deleteIfExists(tmpDistributedDir);
+                               FileCache.copy(tmpPlanFilesPath, 
tmpDistributedDir, true);
+                               
env.registerCachedFile(tmpDistributedDir.toUri().toString(), 
FLINK_PYTHON_DC_ID);
 
-                               distributeFiles(tmpPath, env);
                                JobExecutionResult jer = env.execute();
-                               sendResult(jer);
+                               long runtime = jer.getNetRuntime();
+                               streamer.sendRecord(runtime);
 
                                streamer.finishPlanMode();
+                               sets.reset();
+                       }
+               } finally {
+                       try {
+                               // clean up created files
+                               FileSystem distributedFS = 
tmpDistributedDir.getFileSystem();
+                               distributedFS.delete(tmpDistributedDir, true);
+
+                               FileSystem local = 
FileSystem.getLocalFileSystem();
+                               local.delete(new Path(tmpPlanFilesDir), true);
+                       } catch (IOException ioe) {
+                               LOG.error("PythonAPI file cleanup failed. {}", 
ioe.getMessage());
+                       } finally {
+                               if (streamer != null) {
+                                       streamer.close();
+                               }
                        }
-
-                       clearPath(tmpPath);
-                       close();
-               } catch (Exception e) {
-                       close();
-                       throw e;
                }
        }
 
        
//=====Setup========================================================================================================
 
-       /**
-        * Copies all files to a common directory {@link 
PythonOptions#PLAN_TMP_DIR}). This allows us to distribute it as
-        * one big package which resolves PYTHONPATH issues.
-        *
-        * @param filePaths
-        * @throws IOException
-        * @throws URISyntaxException
-        */
-       private void prepareFiles(String tempFilePath, String... filePaths) 
throws IOException, URISyntaxException {
-               //Flink python package
-               clearPath(tempFilePath);
-               FileCache.copy(new Path(pythonLibraryPath), new 
Path(tmpPlanFilesDir), false);
-
-               //plan file             
-               copyFile(filePaths[0], tempFilePath, FLINK_PYTHON_PLAN_NAME);
-
-               //additional files/folders
-               for (int x = 1; x < filePaths.length; x++) {
-                       copyFile(filePaths[x], tempFilePath, null);
+       private static void deleteIfExists(Path path) throws IOException {
+               FileSystem fs = path.getFileSystem();
+               if (fs.exists(path)) {
+                       fs.delete(path, true);
                }
        }
 
-       private static void clearPath(String path) throws IOException {
-               FileSystem fs = FileSystem.get(new Path(path).toUri());
-               if (fs.exists(new Path(path))) {
-                       fs.delete(new Path(path), true);
-               }
-       }
-
-       private static void copyFile(String path, String target, String name) 
throws IOException, URISyntaxException {
-               if (path.endsWith("/")) {
-                       path = path.substring(0, path.length() - 1);
-               }
-               String identifier = name == null ? 
path.substring(path.lastIndexOf("/")) : name;
-               String tmpFilePath = target + "/" + identifier;
-               clearPath(tmpFilePath);
-               Path p = new Path(path);
-               FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new 
Path(tmpFilePath), true);
-       }
-
-       private void distributeFiles(String tmpPath, ExecutionEnvironment env) 
throws IOException {
-               clearPath(tmpDistributedDir);
-               FileCache.copy(new Path(tmpPath), new Path(tmpDistributedDir), 
true);
-               env.registerCachedFile(new 
Path(tmpDistributedDir).toUri().toString(), FLINK_PYTHON_DC_ID);
-       }
-
-       private void startPython(String tempPath, String[] args) throws 
IOException {
-               StringBuilder arguments = new StringBuilder();
-               for (String arg : args) {
-                       arguments.append(" ").append(arg);
-               }
-
-               operatorConfig.setString(PLAN_ARGUMENTS_KEY, 
arguments.toString());
-
-               streamer = new PythonPlanStreamer(operatorConfig);
-               streamer.open(tempPath, arguments.toString());
-       }
-
-       private void sendResult(JobExecutionResult jer) throws IOException {
-               long runtime = jer.getNetRuntime();
-               streamer.sendRecord(runtime);
-       }
-
-       private void close() {
-               try { //prevent throwing exception so that previous exceptions 
aren't hidden.
-                       FileSystem hdfs = new 
Path(tmpDistributedDir).getFileSystem();
-                       hdfs.delete(new Path(tmpDistributedDir), true);
-
-                       FileSystem local = FileSystem.getLocalFileSystem();
-                       local.delete(new Path(tmpPlanFilesDir), true);
-                       streamer.close();
-               } catch (NullPointerException ignored) {
-               } catch (IOException ioe) {
-                       LOG.error("PythonAPI file cleanup failed. {}", 
ioe.getMessage());
-               }
+       private static void copyFile(Path source, Path targetDirectory, String 
name) throws IOException {
+               Path targetFilePath = new Path(targetDirectory, name);
+               deleteIfExists(targetFilePath);
+               FileCache.copy(source, targetFilePath, true);
        }
 
        
//====Plan==========================================================================================================
-       private void receivePlan() throws IOException {
-               env = ExecutionEnvironment.getExecutionEnvironment();
+       private void receivePlan(ExecutionEnvironment env) throws IOException {
                //IDs used in HashMap of sets are only unique for each 
environment
-               sets.reset();
-               receiveParameters();
-               receiveOperations();
+               receiveParameters(env);
+               receiveOperations(env);
        }
 
        
//====Environment===================================================================================================
@@ -260,7 +229,7 @@ public class PythonPlanBinder {
                ID
        }
 
-       private void receiveParameters() throws IOException {
+       private void receiveParameters(ExecutionEnvironment env) throws 
IOException {
                for (int x = 0; x < 4; x++) {
                        Tuple value = (Tuple) streamer.getRecord(true);
                        switch (Parameters.valueOf(((String) 
value.getField(0)).toUpperCase())) {
@@ -271,7 +240,7 @@ public class PythonPlanBinder {
                                case MODE:
                                        if (value.<Boolean>getField(1)) {
                                                LOG.info("Local execution 
specified, using default for {}.", PythonOptions.DC_TMP_DIR);
-                                               tmpDistributedDir = 
PythonOptions.DC_TMP_DIR.defaultValue();
+                                               tmpDistributedDir = new 
Path(PythonOptions.DC_TMP_DIR.defaultValue());
                                        }
                                        break;
                                case RETRY:
@@ -301,23 +270,23 @@ public class PythonPlanBinder {
                COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, 
JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION
        }
 
-       private void receiveOperations() throws IOException {
+       private void receiveOperations(ExecutionEnvironment env) throws 
IOException {
                Integer operationCount = (Integer) streamer.getRecord(true);
                for (int x = 0; x < operationCount; x++) {
                        PythonOperationInfo info = new 
PythonOperationInfo(streamer, currentEnvironmentID);
                        Operation op = 
Operation.valueOf(info.identifier.toUpperCase());
                        switch (op) {
                                case SOURCE_CSV:
-                                       createCsvSource(info);
+                                       createCsvSource(env, info);
                                        break;
                                case SOURCE_TEXT:
-                                       createTextSource(info);
+                                       createTextSource(env, info);
                                        break;
                                case SOURCE_VALUE:
-                                       createValueSource(info);
+                                       createValueSource(env, info);
                                        break;
                                case SOURCE_SEQ:
-                                       createSequenceSource(info);
+                                       createSequenceSource(env, info);
                                        break;
                                case SINK_CSV:
                                        createCsvSink(info);
@@ -395,12 +364,8 @@ public class PythonPlanBinder {
                }
        }
 
-       private int getParallelism(PythonOperationInfo info) {
-               return info.parallelism == -1 ? env.getParallelism() : 
info.parallelism;
-       }
-
        @SuppressWarnings("unchecked")
-       private <T extends Tuple> void createCsvSource(PythonOperationInfo 
info) {
+       private <T extends Tuple> void createCsvSource(ExecutionEnvironment 
env, PythonOperationInfo info) {
                if (!(info.types instanceof TupleTypeInfo)) {
                        throw new RuntimeException("The output type of a csv 
source has to be a tuple. The derived type is " + info);
                }
@@ -408,41 +373,41 @@ public class PythonPlanBinder {
                String lineD = info.lineDelimiter;
                String fieldD = info.fieldDelimiter;
                TupleTypeInfo<T> types = (TupleTypeInfo<T>) info.types;
-               sets.add(info.setID, env.createInput(new 
TupleCsvInputFormat<>(path, lineD, fieldD, types), 
types).setParallelism(getParallelism(info)).name("CsvSource")
-                       .map(new 
SerializerMap<T>()).setParallelism(getParallelism(info)).name("CsvSourcePostStep"));
+               sets.add(info.setID, env.createInput(new 
TupleCsvInputFormat<>(path, lineD, fieldD, types), 
types).setParallelism(info.parallelism).name("CsvSource")
+                       .map(new 
SerializerMap<T>()).setParallelism(info.parallelism).name("CsvSourcePostStep"));
        }
 
-       private void createTextSource(PythonOperationInfo info) {
-               sets.add(info.setID, 
env.readTextFile(info.path).setParallelism(getParallelism(info)).name("TextSource")
-                       .map(new 
SerializerMap<String>()).setParallelism(getParallelism(info)).name("TextSourcePostStep"));
+       private void createTextSource(ExecutionEnvironment env, 
PythonOperationInfo info) {
+               sets.add(info.setID, 
env.readTextFile(info.path).setParallelism(info.parallelism).name("TextSource")
+                       .map(new 
SerializerMap<String>()).setParallelism(info.parallelism).name("TextSourcePostStep"));
        }
 
-       private void createValueSource(PythonOperationInfo info) {
-               sets.add(info.setID, 
env.fromElements(info.values).setParallelism(getParallelism(info)).name("ValueSource")
-                       .map(new 
SerializerMap<>()).setParallelism(getParallelism(info)).name("ValueSourcePostStep"));
+       private void createValueSource(ExecutionEnvironment env, 
PythonOperationInfo info) {
+               sets.add(info.setID, 
env.fromElements(info.values).setParallelism(info.parallelism).name("ValueSource")
+                       .map(new 
SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep"));
        }
 
-       private void createSequenceSource(PythonOperationInfo info) {
-               sets.add(info.setID, env.generateSequence(info.frm, 
info.to).setParallelism(getParallelism(info)).name("SequenceSource")
-                       .map(new 
SerializerMap<Long>()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep"));
+       private void createSequenceSource(ExecutionEnvironment env, 
PythonOperationInfo info) {
+               sets.add(info.setID, env.generateSequence(info.frm, 
info.to).setParallelism(info.parallelism).name("SequenceSource")
+                       .map(new 
SerializerMap<Long>()).setParallelism(info.parallelism).name("SequenceSourcePostStep"));
        }
 
        private void createCsvSink(PythonOperationInfo info) {
                DataSet<byte[]> parent = sets.getDataSet(info.parentID);
-               parent.map(new 
StringTupleDeserializerMap()).setParallelism(getParallelism(info)).name("CsvSinkPreStep")
-                               .writeAsCsv(info.path, info.lineDelimiter, 
info.fieldDelimiter, 
info.writeMode).setParallelism(getParallelism(info)).name("CsvSink");
+               parent.map(new 
StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep")
+                               .writeAsCsv(info.path, info.lineDelimiter, 
info.fieldDelimiter, 
info.writeMode).setParallelism(info.parallelism).name("CsvSink");
        }
 
        private void createTextSink(PythonOperationInfo info) {
                DataSet<byte[]> parent = sets.getDataSet(info.parentID);
-               parent.map(new 
StringDeserializerMap()).setParallelism(getParallelism(info))
-                       .writeAsText(info.path, 
info.writeMode).setParallelism(getParallelism(info)).name("TextSink");
+               parent.map(new 
StringDeserializerMap()).setParallelism(info.parallelism)
+                       .writeAsText(info.path, 
info.writeMode).setParallelism(info.parallelism).name("TextSink");
        }
 
        private void createPrintSink(PythonOperationInfo info) {
                DataSet<byte[]> parent = sets.getDataSet(info.parentID);
-               parent.map(new 
StringDeserializerMap()).setParallelism(getParallelism(info)).name("PrintSinkPreStep")
-                       .output(new 
PrintingOutputFormat<String>(info.toError)).setParallelism(getParallelism(info));
+               parent.map(new 
StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep")
+                       .output(new 
PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism);
        }
 
        private void createBroadcastVariable(PythonOperationInfo info) {
@@ -466,8 +431,8 @@ public class PythonPlanBinder {
        private <K extends Tuple> void 
createDistinctOperation(PythonOperationInfo info) {
                DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID);
                DataSet<byte[]> result = op
-                       
.distinct(info.keys).setParallelism(getParallelism(info)).name("Distinct")
-                       .map(new 
KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("DistinctPostStep");
+                       
.distinct(info.keys).setParallelism(info.parallelism).name("Distinct")
+                       .map(new 
KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep");
                sets.add(info.setID, result);
        }
 
@@ -475,17 +440,17 @@ public class PythonPlanBinder {
                if (sets.isDataSet(info.parentID)) {
                        DataSet<byte[]> op = sets.getDataSet(info.parentID);
                        sets.add(info.setID, op
-                               
.first(info.count).setParallelism(getParallelism(info)).name("First"));
+                               
.first(info.count).setParallelism(info.parallelism).name("First"));
                } else if (sets.isUnsortedGrouping(info.parentID)) {
                        UnsortedGrouping<Tuple2<K, byte[]>> op = 
sets.getUnsortedGrouping(info.parentID);
                        sets.add(info.setID, op
-                               
.first(info.count).setParallelism(getParallelism(info)).name("First")
-                               .map(new 
KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("FirstPostStep"));
+                               
.first(info.count).setParallelism(info.parallelism).name("First")
+                               .map(new 
KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
                } else if (sets.isSortedGrouping(info.parentID)) {
                        SortedGrouping<Tuple2<K, byte[]>> op = 
sets.getSortedGrouping(info.parentID);
                        sets.add(info.setID, op
-                               
.first(info.count).setParallelism(getParallelism(info)).name("First")
-                               .map(new 
KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("FirstPostStep"));
+                               
.first(info.count).setParallelism(info.parallelism).name("First")
+                               .map(new 
KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep"));
                }
        }
 
@@ -497,14 +462,14 @@ public class PythonPlanBinder {
        private <K extends Tuple> void 
createHashPartitionOperation(PythonOperationInfo info) {
                DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID);
                DataSet<byte[]> result = op1
-                       
.partitionByHash(info.keys).setParallelism(getParallelism(info))
-                       .map(new 
KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("HashPartitionPostStep");
+                       
.partitionByHash(info.keys).setParallelism(info.parallelism)
+                       .map(new 
KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep");
                sets.add(info.setID, result);
        }
 
        private void createRebalanceOperation(PythonOperationInfo info) {
                DataSet<?> op = sets.getDataSet(info.parentID);
-               sets.add(info.setID, 
op.rebalance().setParallelism(getParallelism(info)).name("Rebalance"));
+               sets.add(info.setID, 
op.rebalance().setParallelism(info.parallelism).name("Rebalance"));
        }
 
        private void createSortOperation(PythonOperationInfo info) {
@@ -520,7 +485,7 @@ public class PythonPlanBinder {
        private <IN> void createUnionOperation(PythonOperationInfo info) {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
                DataSet<IN> op2 = sets.getDataSet(info.otherID);
-               sets.add(info.setID, 
op1.union(op2).setParallelism(getParallelism(info)).name("Union"));
+               sets.add(info.setID, 
op1.union(op2).setParallelism(info.parallelism).name("Union"));
        }
 
        private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo 
info, TypeInformation<OUT> type) {
@@ -529,7 +494,7 @@ public class PythonPlanBinder {
                Keys.ExpressionKeys<IN1> key1 = new 
Keys.ExpressionKeys<>(info.keys1, op1.getType());
                Keys.ExpressionKeys<IN2> key2 = new 
Keys.ExpressionKeys<>(info.keys2, op2.getType());
                PythonCoGroup<IN1, IN2, OUT> pcg = new 
PythonCoGroup<>(operatorConfig, info.envID, info.setID, type);
-               sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, 
key2, pcg, type, info.name).setParallelism(getParallelism(info)));
+               sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, 
key2, pcg, type, info.name).setParallelism(info.parallelism));
        }
 
        private <IN1, IN2, OUT> void createCrossOperation(DatasizeHint mode, 
PythonOperationInfo info, TypeInformation<OUT> type) {
@@ -551,11 +516,11 @@ public class PythonPlanBinder {
                                throw new IllegalArgumentException("Invalid 
Cross mode specified: " + mode);
                }
 
-               defaultResult.setParallelism(getParallelism(info));
+               defaultResult.setParallelism(info.parallelism);
                if (info.usesUDF) {
                        sets.add(info.setID, defaultResult
                                .mapPartition(new 
PythonMapPartition<Tuple2<IN1, IN2>, OUT>(operatorConfig, info.envID, 
info.setID, type))
-                               
.setParallelism(getParallelism(info)).name(info.name));
+                               
.setParallelism(info.parallelism).name(info.name));
                } else {
                        sets.add(info.setID, 
defaultResult.name("DefaultCross"));
                }
@@ -565,14 +530,14 @@ public class PythonPlanBinder {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
                sets.add(info.setID, op1
                        .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
-                       .setParallelism(getParallelism(info)).name(info.name));
+                       .setParallelism(info.parallelism).name(info.name));
        }
 
        private <IN, OUT> void createFlatMapOperation(PythonOperationInfo info, 
TypeInformation<OUT> type) {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
                sets.add(info.setID, op1
                        .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
-                       .setParallelism(getParallelism(info)).name(info.name));
+                       .setParallelism(info.parallelism).name(info.name));
        }
 
        private void createGroupReduceOperation(PythonOperationInfo info) {
@@ -587,23 +552,23 @@ public class PythonPlanBinder {
 
        private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(DataSet<IN> 
op1, PythonOperationInfo info, TypeInformation<OUT> type) {
                return op1
-                       .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info))
+                       .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(info.parallelism)
                        .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
-                       .setParallelism(getParallelism(info)).name(info.name);
+                       .setParallelism(info.parallelism).name(info.name);
        }
 
        private <IN, OUT> DataSet<OUT> 
applyGroupReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, 
TypeInformation<OUT> type) {
                return op1
-                       .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
+                       .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonGroupReducePreStep")
                        .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
-                       .setParallelism(getParallelism(info)).name(info.name);
+                       .setParallelism(info.parallelism).name(info.name);
        }
 
        private <IN, OUT> DataSet<OUT> 
applyGroupReduceOperation(SortedGrouping<IN> op1, PythonOperationInfo info, 
TypeInformation<OUT> type) {
                return op1
-                       .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
+                       .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonGroupReducePreStep")
                        .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
-                       .setParallelism(getParallelism(info)).name(info.name);
+                       .setParallelism(info.parallelism).name(info.name);
        }
 
        private <IN1, IN2, OUT> void createJoinOperation(DatasizeHint mode, 
PythonOperationInfo info, TypeInformation<OUT> type) {
@@ -611,11 +576,11 @@ public class PythonPlanBinder {
                DataSet<IN2> op2 = sets.getDataSet(info.otherID);
 
                if (info.usesUDF) {
-                       sets.add(info.setID, createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode, getParallelism(info))
+                       sets.add(info.setID, createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode, info.parallelism)
                                .mapPartition(new 
PythonMapPartition<Tuple2<byte[], byte[]>, OUT>(operatorConfig, info.envID, 
info.setID, type))
-                               
.setParallelism(getParallelism(info)).name(info.name));
+                               
.setParallelism(info.parallelism).name(info.name));
                } else {
-                       sets.add(info.setID, createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode, getParallelism(info)));
+                       sets.add(info.setID, createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode, info.parallelism));
                }
        }
 
@@ -642,14 +607,14 @@ public class PythonPlanBinder {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
                sets.add(info.setID, op1
                        .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
-                       .setParallelism(getParallelism(info)).name(info.name));
+                       .setParallelism(info.parallelism).name(info.name));
        }
 
        private <IN, OUT> void createMapPartitionOperation(PythonOperationInfo 
info, TypeInformation<OUT> type) {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
                sets.add(info.setID, op1
                        .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
-                       .setParallelism(getParallelism(info)).name(info.name));
+                       .setParallelism(info.parallelism).name(info.name));
        }
 
        private void createReduceOperation(PythonOperationInfo info) {
@@ -664,15 +629,15 @@ public class PythonPlanBinder {
 
        private <IN, OUT> DataSet<OUT> applyReduceOperation(DataSet<IN> op1, 
PythonOperationInfo info, TypeInformation<OUT> type) {
                return op1
-                       .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
+                       .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonReducePreStep")
                        .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
-                       .setParallelism(getParallelism(info)).name(info.name);
+                       .setParallelism(info.parallelism).name(info.name);
        }
 
        private <IN, OUT> DataSet<OUT> 
applyReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, 
TypeInformation<OUT> type) {
                return op1
-                       .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
+                       .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonReducePreStep")
                        .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
-                       .setParallelism(getParallelism(info)).name(info.name);
+                       .setParallelism(info.parallelism).name(info.name);
        }
 }

Reply via email to