This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9190a6767f725ce7ff5f46c7c8f8a62c45503c34
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Jul 12 18:28:36 2019 +0200

    [FLINK-13250][blink runner] Make sure that all nodes have a concrete 
resource profile
    
    This change is covered by various existing integration tests that failed 
prior to this fix.
---
 .../src/main/java/org/apache/flink/table/executor/BatchExecutor.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index 8a90b80..f4bb2ee 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -84,7 +84,9 @@ public class BatchExecutor extends ExecutorBase {
                // All transformations should set managed memory size.
                ResourceSpec managedResourceSpec = 
NodeResourceUtil.fromManagedMem(0);
                streamGraph.getStreamNodes().forEach(sn -> {
-                       
sn.setResources(sn.getMinResources().merge(managedResourceSpec), 
sn.getPreferredResources().merge(managedResourceSpec));
+                       if (sn.getMinResources().equals(ResourceSpec.DEFAULT)) {
+                               sn.setResources(managedResourceSpec, 
managedResourceSpec);
+                       }
                });
                streamGraph.setChaining(true);
                streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);

Reply via email to