This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc9da62894ca1e9d3187a653d4149da040005cb4
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Fri Aug 20 17:21:05 2021 +0200

    [FLINK-22932][tests] Harden RocksDBStateBackendWindowITCase by using 
RpcUtils.INF_TIMEOUT for MiniCluster RPC timeout
    
    This commit sets the default rpc timeout for the MiniCluster to 
RpcUtils.INF_TIMEOUT. This should be ok since
    in a local execution the communication between the different components 
should be reliable.
    
    This closes #16914.
---
 .../main/java/org/apache/flink/runtime/minicluster/MiniCluster.java | 4 +++-
 .../apache/flink/runtime/minicluster/MiniClusterConfiguration.java  | 6 ------
 2 files changed, 3 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 5d32bf7..c597a34 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -216,7 +216,9 @@ public class MiniCluster implements AutoCloseableAsync {
                                         .getNumTaskManagers()); // common + JM 
+ RM + TMs
         this.dispatcherResourceManagerComponents = new ArrayList<>(1);
 
-        this.rpcTimeout = miniClusterConfiguration.getRpcTimeout();
+        // There shouldn't be any lost messages between the MiniCluster and 
the Flink components
+        // since they all run in the same process.
+        this.rpcTimeout = RpcUtils.INF_TIMEOUT;
         this.terminationFuture = CompletableFuture.completedFuture(null);
         running = false;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 8b7970f..2dacb0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.runtime.minicluster;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -120,10 +118,6 @@ public class MiniClusterConfiguration {
                 : configuration.getString(TaskManagerOptions.BIND_HOST, 
"localhost");
     }
 
-    public Time getRpcTimeout() {
-        return AkkaUtils.getTimeoutAsTime(configuration);
-    }
-
     public UnmodifiableConfiguration getConfiguration() {
         return configuration;
     }

Reply via email to