[FLINK-6229] [py] Rework setup of PythonPlanBinder - make file/argument split more readable - pass on Paths where applicable instead of recreating them every time - rename PPB#clearPath to more appropriate deleteIfExists - simplify PPB#copyFile - simplify PPB#startPython - use UUID#randomUUID() instead of Random.nextInt() - remove several invalid exception declarations
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ff9c99f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ff9c99f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ff9c99f Branch: refs/heads/table-retraction Commit: 5ff9c99ff193725c125f2b4c450411db97b6f3b4 Parents: bdcebfd Author: zentol <ches...@apache.org> Authored: Fri Mar 31 12:11:40 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Thu Apr 6 10:57:11 2017 +0200 ---------------------------------------------------------------------- .../flink/python/api/PythonPlanBinder.java | 283 ++++++++----------- 1 file changed, 124 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5ff9c99f/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index b6181b4..2378d60 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -49,9 +49,8 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.net.URISyntaxException; import java.util.Arrays; -import java.util.Random; +import java.util.UUID; import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.HUGE; import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.NONE; @@ -68,9 +67,6 @@ public class PythonPlanBinder { public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT"; public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_"; - - private static final Random r = new Random(); - public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments"; private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python"; @@ -80,10 +76,9 @@ public class PythonPlanBinder { private final String pythonLibraryPath; private final String tmpPlanFilesDir; - private String tmpDistributedDir; + private Path tmpDistributedDir; private final SetCache sets = new SetCache(); - public ExecutionEnvironment env; private int currentEnvironmentID = 0; private PythonPlanStreamer streamer; @@ -108,9 +103,9 @@ public class PythonPlanBinder { String configuredPlanTmpPath = globalConfig.getString(PythonOptions.PLAN_TMP_DIR); tmpPlanFilesDir = configuredPlanTmpPath != null ? configuredPlanTmpPath - : System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + r.nextInt(); + : System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID(); - tmpDistributedDir = globalConfig.getString(PythonOptions.DC_TMP_DIR); + tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR)); String flinkRootDir = System.getenv("FLINK_ROOT_DIR"); pythonLibraryPath = flinkRootDir != null @@ -131,121 +126,95 @@ public class PythonPlanBinder { void runPlan(String[] args) throws Exception { int split = 0; for (int x = 0; x < args.length; x++) { - if (args[x].compareTo("-") == 0) { + if (args[x].equals("-")) { split = x; + break; } } try { - String tmpPath = tmpPlanFilesDir; - prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split)); - startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length)); + String planFile = args[0]; + String[] filesToCopy = Arrays.copyOfRange(args, 1, split == 0 ? args.length : split); + String[] planArgumentsArray = Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length); + + StringBuilder planArgumentsBuilder = new StringBuilder(); + for (String arg : planArgumentsArray) { + planArgumentsBuilder.append(" ").append(arg); + } + String planArguments = planArgumentsBuilder.toString(); + + operatorConfig.setString(PLAN_ARGUMENTS_KEY, planArguments); + + // copy flink library, plan file and additional files to temporary location + Path tmpPlanFilesPath = new Path(tmpPlanFilesDir); + deleteIfExists(tmpPlanFilesPath); + FileCache.copy(new Path(pythonLibraryPath), tmpPlanFilesPath, false); + copyFile(new Path(planFile), tmpPlanFilesPath, FLINK_PYTHON_PLAN_NAME); + for (String file : filesToCopy) { + Path source = new Path(file); + copyFile(source, tmpPlanFilesPath, source.getName()); + } + + // start python process + streamer = new PythonPlanStreamer(operatorConfig); + streamer.open(tmpPlanFilesDir, planArguments); // Python process should terminate itself when all jobs have been run while (streamer.preparePlanMode()) { - receivePlan(); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + receivePlan(env); + + // upload files to remote FS and register on Distributed Cache + deleteIfExists(tmpDistributedDir); + FileCache.copy(tmpPlanFilesPath, tmpDistributedDir, true); + env.registerCachedFile(tmpDistributedDir.toUri().toString(), FLINK_PYTHON_DC_ID); - distributeFiles(tmpPath, env); JobExecutionResult jer = env.execute(); - sendResult(jer); + long runtime = jer.getNetRuntime(); + streamer.sendRecord(runtime); streamer.finishPlanMode(); + sets.reset(); + } + } finally { + try { + // clean up created files + FileSystem distributedFS = tmpDistributedDir.getFileSystem(); + distributedFS.delete(tmpDistributedDir, true); + + FileSystem local = FileSystem.getLocalFileSystem(); + local.delete(new Path(tmpPlanFilesDir), true); + } catch (IOException ioe) { + LOG.error("PythonAPI file cleanup failed. {}", ioe.getMessage()); + } finally { + if (streamer != null) { + streamer.close(); + } } - - clearPath(tmpPath); - close(); - } catch (Exception e) { - close(); - throw e; } } //=====Setup======================================================================================================== - /** - * Copies all files to a common directory {@link PythonOptions#PLAN_TMP_DIR}). This allows us to distribute it as - * one big package which resolves PYTHONPATH issues. - * - * @param filePaths - * @throws IOException - * @throws URISyntaxException - */ - private void prepareFiles(String tempFilePath, String... filePaths) throws IOException, URISyntaxException { - //Flink python package - clearPath(tempFilePath); - FileCache.copy(new Path(pythonLibraryPath), new Path(tmpPlanFilesDir), false); - - //plan file - copyFile(filePaths[0], tempFilePath, FLINK_PYTHON_PLAN_NAME); - - //additional files/folders - for (int x = 1; x < filePaths.length; x++) { - copyFile(filePaths[x], tempFilePath, null); + private static void deleteIfExists(Path path) throws IOException { + FileSystem fs = path.getFileSystem(); + if (fs.exists(path)) { + fs.delete(path, true); } } - private static void clearPath(String path) throws IOException { - FileSystem fs = FileSystem.get(new Path(path).toUri()); - if (fs.exists(new Path(path))) { - fs.delete(new Path(path), true); - } - } - - private static void copyFile(String path, String target, String name) throws IOException, URISyntaxException { - if (path.endsWith("/")) { - path = path.substring(0, path.length() - 1); - } - String identifier = name == null ? path.substring(path.lastIndexOf("/")) : name; - String tmpFilePath = target + "/" + identifier; - clearPath(tmpFilePath); - Path p = new Path(path); - FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new Path(tmpFilePath), true); - } - - private void distributeFiles(String tmpPath, ExecutionEnvironment env) throws IOException { - clearPath(tmpDistributedDir); - FileCache.copy(new Path(tmpPath), new Path(tmpDistributedDir), true); - env.registerCachedFile(new Path(tmpDistributedDir).toUri().toString(), FLINK_PYTHON_DC_ID); - } - - private void startPython(String tempPath, String[] args) throws IOException { - StringBuilder arguments = new StringBuilder(); - for (String arg : args) { - arguments.append(" ").append(arg); - } - - operatorConfig.setString(PLAN_ARGUMENTS_KEY, arguments.toString()); - - streamer = new PythonPlanStreamer(operatorConfig); - streamer.open(tempPath, arguments.toString()); - } - - private void sendResult(JobExecutionResult jer) throws IOException { - long runtime = jer.getNetRuntime(); - streamer.sendRecord(runtime); - } - - private void close() { - try { //prevent throwing exception so that previous exceptions aren't hidden. - FileSystem hdfs = new Path(tmpDistributedDir).getFileSystem(); - hdfs.delete(new Path(tmpDistributedDir), true); - - FileSystem local = FileSystem.getLocalFileSystem(); - local.delete(new Path(tmpPlanFilesDir), true); - streamer.close(); - } catch (NullPointerException ignored) { - } catch (IOException ioe) { - LOG.error("PythonAPI file cleanup failed. {}", ioe.getMessage()); - } + private static void copyFile(Path source, Path targetDirectory, String name) throws IOException { + Path targetFilePath = new Path(targetDirectory, name); + deleteIfExists(targetFilePath); + FileCache.copy(source, targetFilePath, true); } //====Plan========================================================================================================== - private void receivePlan() throws IOException { - env = ExecutionEnvironment.getExecutionEnvironment(); + private void receivePlan(ExecutionEnvironment env) throws IOException { //IDs used in HashMap of sets are only unique for each environment - sets.reset(); - receiveParameters(); - receiveOperations(); + receiveParameters(env); + receiveOperations(env); } //====Environment=================================================================================================== @@ -260,7 +229,7 @@ public class PythonPlanBinder { ID } - private void receiveParameters() throws IOException { + private void receiveParameters(ExecutionEnvironment env) throws IOException { for (int x = 0; x < 4; x++) { Tuple value = (Tuple) streamer.getRecord(true); switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) { @@ -271,7 +240,7 @@ public class PythonPlanBinder { case MODE: if (value.<Boolean>getField(1)) { LOG.info("Local execution specified, using default for {}.", PythonOptions.DC_TMP_DIR); - tmpDistributedDir = PythonOptions.DC_TMP_DIR.defaultValue(); + tmpDistributedDir = new Path(PythonOptions.DC_TMP_DIR.defaultValue()); } break; case RETRY: @@ -301,23 +270,23 @@ public class PythonPlanBinder { COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION } - private void receiveOperations() throws IOException { + private void receiveOperations(ExecutionEnvironment env) throws IOException { Integer operationCount = (Integer) streamer.getRecord(true); for (int x = 0; x < operationCount; x++) { PythonOperationInfo info = new PythonOperationInfo(streamer, currentEnvironmentID); Operation op = Operation.valueOf(info.identifier.toUpperCase()); switch (op) { case SOURCE_CSV: - createCsvSource(info); + createCsvSource(env, info); break; case SOURCE_TEXT: - createTextSource(info); + createTextSource(env, info); break; case SOURCE_VALUE: - createValueSource(info); + createValueSource(env, info); break; case SOURCE_SEQ: - createSequenceSource(info); + createSequenceSource(env, info); break; case SINK_CSV: createCsvSink(info); @@ -395,12 +364,8 @@ public class PythonPlanBinder { } } - private int getParallelism(PythonOperationInfo info) { - return info.parallelism == -1 ? env.getParallelism() : info.parallelism; - } - @SuppressWarnings("unchecked") - private <T extends Tuple> void createCsvSource(PythonOperationInfo info) { + private <T extends Tuple> void createCsvSource(ExecutionEnvironment env, PythonOperationInfo info) { if (!(info.types instanceof TupleTypeInfo)) { throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info); } @@ -408,41 +373,41 @@ public class PythonPlanBinder { String lineD = info.lineDelimiter; String fieldD = info.fieldDelimiter; TupleTypeInfo<T> types = (TupleTypeInfo<T>) info.types; - sets.add(info.setID, env.createInput(new TupleCsvInputFormat<>(path, lineD, fieldD, types), types).setParallelism(getParallelism(info)).name("CsvSource") - .map(new SerializerMap<T>()).setParallelism(getParallelism(info)).name("CsvSourcePostStep")); + sets.add(info.setID, env.createInput(new TupleCsvInputFormat<>(path, lineD, fieldD, types), types).setParallelism(info.parallelism).name("CsvSource") + .map(new SerializerMap<T>()).setParallelism(info.parallelism).name("CsvSourcePostStep")); } - private void createTextSource(PythonOperationInfo info) { - sets.add(info.setID, env.readTextFile(info.path).setParallelism(getParallelism(info)).name("TextSource") - .map(new SerializerMap<String>()).setParallelism(getParallelism(info)).name("TextSourcePostStep")); + private void createTextSource(ExecutionEnvironment env, PythonOperationInfo info) { + sets.add(info.setID, env.readTextFile(info.path).setParallelism(info.parallelism).name("TextSource") + .map(new SerializerMap<String>()).setParallelism(info.parallelism).name("TextSourcePostStep")); } - private void createValueSource(PythonOperationInfo info) { - sets.add(info.setID, env.fromElements(info.values).setParallelism(getParallelism(info)).name("ValueSource") - .map(new SerializerMap<>()).setParallelism(getParallelism(info)).name("ValueSourcePostStep")); + private void createValueSource(ExecutionEnvironment env, PythonOperationInfo info) { + sets.add(info.setID, env.fromElements(info.values).setParallelism(info.parallelism).name("ValueSource") + .map(new SerializerMap<>()).setParallelism(info.parallelism).name("ValueSourcePostStep")); } - private void createSequenceSource(PythonOperationInfo info) { - sets.add(info.setID, env.generateSequence(info.frm, info.to).setParallelism(getParallelism(info)).name("SequenceSource") - .map(new SerializerMap<Long>()).setParallelism(getParallelism(info)).name("SequenceSourcePostStep")); + private void createSequenceSource(ExecutionEnvironment env, PythonOperationInfo info) { + sets.add(info.setID, env.generateSequence(info.frm, info.to).setParallelism(info.parallelism).name("SequenceSource") + .map(new SerializerMap<Long>()).setParallelism(info.parallelism).name("SequenceSourcePostStep")); } private void createCsvSink(PythonOperationInfo info) { DataSet<byte[]> parent = sets.getDataSet(info.parentID); - parent.map(new StringTupleDeserializerMap()).setParallelism(getParallelism(info)).name("CsvSinkPreStep") - .writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(getParallelism(info)).name("CsvSink"); + parent.map(new StringTupleDeserializerMap()).setParallelism(info.parallelism).name("CsvSinkPreStep") + .writeAsCsv(info.path, info.lineDelimiter, info.fieldDelimiter, info.writeMode).setParallelism(info.parallelism).name("CsvSink"); } private void createTextSink(PythonOperationInfo info) { DataSet<byte[]> parent = sets.getDataSet(info.parentID); - parent.map(new StringDeserializerMap()).setParallelism(getParallelism(info)) - .writeAsText(info.path, info.writeMode).setParallelism(getParallelism(info)).name("TextSink"); + parent.map(new StringDeserializerMap()).setParallelism(info.parallelism) + .writeAsText(info.path, info.writeMode).setParallelism(info.parallelism).name("TextSink"); } private void createPrintSink(PythonOperationInfo info) { DataSet<byte[]> parent = sets.getDataSet(info.parentID); - parent.map(new StringDeserializerMap()).setParallelism(getParallelism(info)).name("PrintSinkPreStep") - .output(new PrintingOutputFormat<String>(info.toError)).setParallelism(getParallelism(info)); + parent.map(new StringDeserializerMap()).setParallelism(info.parallelism).name("PrintSinkPreStep") + .output(new PrintingOutputFormat<String>(info.toError)).setParallelism(info.parallelism); } private void createBroadcastVariable(PythonOperationInfo info) { @@ -466,8 +431,8 @@ public class PythonPlanBinder { private <K extends Tuple> void createDistinctOperation(PythonOperationInfo info) { DataSet<Tuple2<K, byte[]>> op = sets.getDataSet(info.parentID); DataSet<byte[]> result = op - .distinct(info.keys).setParallelism(getParallelism(info)).name("Distinct") - .map(new KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("DistinctPostStep"); + .distinct(info.keys).setParallelism(info.parallelism).name("Distinct") + .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("DistinctPostStep"); sets.add(info.setID, result); } @@ -475,17 +440,17 @@ public class PythonPlanBinder { if (sets.isDataSet(info.parentID)) { DataSet<byte[]> op = sets.getDataSet(info.parentID); sets.add(info.setID, op - .first(info.count).setParallelism(getParallelism(info)).name("First")); + .first(info.count).setParallelism(info.parallelism).name("First")); } else if (sets.isUnsortedGrouping(info.parentID)) { UnsortedGrouping<Tuple2<K, byte[]>> op = sets.getUnsortedGrouping(info.parentID); sets.add(info.setID, op - .first(info.count).setParallelism(getParallelism(info)).name("First") - .map(new KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("FirstPostStep")); + .first(info.count).setParallelism(info.parallelism).name("First") + .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep")); } else if (sets.isSortedGrouping(info.parentID)) { SortedGrouping<Tuple2<K, byte[]>> op = sets.getSortedGrouping(info.parentID); sets.add(info.setID, op - .first(info.count).setParallelism(getParallelism(info)).name("First") - .map(new KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("FirstPostStep")); + .first(info.count).setParallelism(info.parallelism).name("First") + .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("FirstPostStep")); } } @@ -497,14 +462,14 @@ public class PythonPlanBinder { private <K extends Tuple> void createHashPartitionOperation(PythonOperationInfo info) { DataSet<Tuple2<K, byte[]>> op1 = sets.getDataSet(info.parentID); DataSet<byte[]> result = op1 - .partitionByHash(info.keys).setParallelism(getParallelism(info)) - .map(new KeyDiscarder<K>()).setParallelism(getParallelism(info)).name("HashPartitionPostStep"); + .partitionByHash(info.keys).setParallelism(info.parallelism) + .map(new KeyDiscarder<K>()).setParallelism(info.parallelism).name("HashPartitionPostStep"); sets.add(info.setID, result); } private void createRebalanceOperation(PythonOperationInfo info) { DataSet<?> op = sets.getDataSet(info.parentID); - sets.add(info.setID, op.rebalance().setParallelism(getParallelism(info)).name("Rebalance")); + sets.add(info.setID, op.rebalance().setParallelism(info.parallelism).name("Rebalance")); } private void createSortOperation(PythonOperationInfo info) { @@ -520,7 +485,7 @@ public class PythonPlanBinder { private <IN> void createUnionOperation(PythonOperationInfo info) { DataSet<IN> op1 = sets.getDataSet(info.parentID); DataSet<IN> op2 = sets.getDataSet(info.otherID); - sets.add(info.setID, op1.union(op2).setParallelism(getParallelism(info)).name("Union")); + sets.add(info.setID, op1.union(op2).setParallelism(info.parallelism).name("Union")); } private <IN1, IN2, OUT> void createCoGroupOperation(PythonOperationInfo info, TypeInformation<OUT> type) { @@ -529,7 +494,7 @@ public class PythonPlanBinder { Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1, op1.getType()); Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2, op2.getType()); PythonCoGroup<IN1, IN2, OUT> pcg = new PythonCoGroup<>(operatorConfig, info.envID, info.setID, type); - sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, key2, pcg, type, info.name).setParallelism(getParallelism(info))); + sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, key2, pcg, type, info.name).setParallelism(info.parallelism)); } private <IN1, IN2, OUT> void createCrossOperation(DatasizeHint mode, PythonOperationInfo info, TypeInformation<OUT> type) { @@ -551,11 +516,11 @@ public class PythonPlanBinder { throw new IllegalArgumentException("Invalid Cross mode specified: " + mode); } - defaultResult.setParallelism(getParallelism(info)); + defaultResult.setParallelism(info.parallelism); if (info.usesUDF) { sets.add(info.setID, defaultResult .mapPartition(new PythonMapPartition<Tuple2<IN1, IN2>, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name)); + .setParallelism(info.parallelism).name(info.name)); } else { sets.add(info.setID, defaultResult.name("DefaultCross")); } @@ -565,14 +530,14 @@ public class PythonPlanBinder { DataSet<IN> op1 = sets.getDataSet(info.parentID); sets.add(info.setID, op1 .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name)); + .setParallelism(info.parallelism).name(info.name)); } private <IN, OUT> void createFlatMapOperation(PythonOperationInfo info, TypeInformation<OUT> type) { DataSet<IN> op1 = sets.getDataSet(info.parentID); sets.add(info.setID, op1 .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name)); + .setParallelism(info.parallelism).name(info.name)); } private void createGroupReduceOperation(PythonOperationInfo info) { @@ -587,23 +552,23 @@ public class PythonPlanBinder { private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) { return op1 - .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info)) + .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(info.parallelism) .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name); + .setParallelism(info.parallelism).name(info.name); } private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) { return op1 - .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep") + .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonGroupReducePreStep") .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name); + .setParallelism(info.parallelism).name(info.name); } private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(SortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) { return op1 - .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep") + .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonGroupReducePreStep") .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name); + .setParallelism(info.parallelism).name(info.name); } private <IN1, IN2, OUT> void createJoinOperation(DatasizeHint mode, PythonOperationInfo info, TypeInformation<OUT> type) { @@ -611,11 +576,11 @@ public class PythonPlanBinder { DataSet<IN2> op2 = sets.getDataSet(info.otherID); if (info.usesUDF) { - sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info)) + sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, info.parallelism) .mapPartition(new PythonMapPartition<Tuple2<byte[], byte[]>, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name)); + .setParallelism(info.parallelism).name(info.name)); } else { - sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info))); + sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, info.parallelism)); } } @@ -642,14 +607,14 @@ public class PythonPlanBinder { DataSet<IN> op1 = sets.getDataSet(info.parentID); sets.add(info.setID, op1 .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name)); + .setParallelism(info.parallelism).name(info.name)); } private <IN, OUT> void createMapPartitionOperation(PythonOperationInfo info, TypeInformation<OUT> type) { DataSet<IN> op1 = sets.getDataSet(info.parentID); sets.add(info.setID, op1 .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name)); + .setParallelism(info.parallelism).name(info.name)); } private void createReduceOperation(PythonOperationInfo info) { @@ -664,15 +629,15 @@ public class PythonPlanBinder { private <IN, OUT> DataSet<OUT> applyReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) { return op1 - .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep") + .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonReducePreStep") .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name); + .setParallelism(info.parallelism).name(info.name); } private <IN, OUT> DataSet<OUT> applyReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) { return op1 - .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep") + .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(info.parallelism).name("PythonReducePreStep") .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) - .setParallelism(getParallelism(info)).name(info.name); + .setParallelism(info.parallelism).name(info.name); } }