This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d56c45a090de689a01fc8ddb884e6baa29568322 Author: Stephan Ewen <se...@apache.org> AuthorDate: Fri Jul 12 18:28:36 2019 +0200 [FLINK-13250][blink runner] Make sure that all nodes have a concrete resource profile This change is covered by various existing integration tests that failed prior to this fix. --- .../src/main/java/org/apache/flink/table/executor/BatchExecutor.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java index 8a90b80..f4bb2ee 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java @@ -84,7 +84,9 @@ public class BatchExecutor extends ExecutorBase { // All transformations should set managed memory size. ResourceSpec managedResourceSpec = NodeResourceUtil.fromManagedMem(0); streamGraph.getStreamNodes().forEach(sn -> { - sn.setResources(sn.getMinResources().merge(managedResourceSpec), sn.getPreferredResources().merge(managedResourceSpec)); + if (sn.getMinResources().equals(ResourceSpec.DEFAULT)) { + sn.setResources(managedResourceSpec, managedResourceSpec); + } }); streamGraph.setChaining(true); streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);