This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 22c1eb44df7 [FLINK-33344][rpc] Replace Time with Duration in 
RpcInputSplitProvider
22c1eb44df7 is described below

commit 22c1eb44df7226c8e9045789e45c71c93668c644
Author: Jiabao Sun <jiabao....@xtransfer.cn>
AuthorDate: Tue Oct 24 12:05:02 2023 +0800

    [FLINK-33344][rpc] Replace Time with Duration in RpcInputSplitProvider
---
 .../java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java | 4 ++--
 .../flink/runtime/taskexecutor/TaskManagerConfiguration.java     | 9 ++++-----
 .../flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java    | 9 +++++----
 3 files changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d42a5658598..680514f09a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1470,7 +1470,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                         getResourceID(),
                         taskExecutorRegistrationId,
                         taskSlotTable.createSlotReport(getResourceID()),
-                        taskManagerConfiguration.getRpcTimeout());
+                        
Time.fromDuration(taskManagerConfiguration.getRpcTimeout()));
 
         slotReportResponseFuture.whenCompleteAsync(
                 (acknowledge, throwable) -> {
@@ -1610,7 +1610,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                     jobMasterGateway.offerSlots(
                             getResourceID(),
                             reservedSlots,
-                            taskManagerConfiguration.getRpcTimeout());
+                            
Time.fromDuration(taskManagerConfiguration.getRpcTimeout()));
 
             acceptedSlotsFuture.whenCompleteAsync(
                     handleAcceptedSlotOffers(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 2ec461b40cd..c3c76220c10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -51,7 +51,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
 
     private final String[] tmpDirectories;
 
-    private final Time rpcTimeout;
+    private final Duration rpcTimeout;
 
     private final Time slotTimeout;
 
@@ -79,7 +79,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
             ResourceProfile defaultSlotResourceProfile,
             ResourceProfile totalResourceProfile,
             String[] tmpDirectories,
-            Time rpcTimeout,
+            Duration rpcTimeout,
             Time slotTimeout,
             @Nullable Duration maxRegistrationDuration,
             Configuration configuration,
@@ -121,7 +121,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
         return totalResourceProfile;
     }
 
-    public Time getRpcTimeout() {
+    public Duration getRpcTimeout() {
         return rpcTimeout;
     }
 
@@ -195,8 +195,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
 
         final String[] tmpDirPaths = 
ConfigurationUtils.parseTempDirectories(configuration);
 
-        final Time rpcTimeout =
-                
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+        final Duration rpcTimeout = 
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
 
         LOG.debug("Messages have a max timeout of " + rpcTimeout);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
index 31b4ae67ec8..0afdf05f0eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskexecutor.rpc;
 
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -29,19 +28,21 @@ import 
org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 public class RpcInputSplitProvider implements InputSplitProvider {
     private final JobMasterGateway jobMasterGateway;
     private final JobVertexID jobVertexID;
     private final ExecutionAttemptID executionAttemptID;
-    private final Time timeout;
+    private final Duration timeout;
 
     public RpcInputSplitProvider(
             JobMasterGateway jobMasterGateway,
             JobVertexID jobVertexID,
             ExecutionAttemptID executionAttemptID,
-            Time timeout) {
+            Duration timeout) {
         this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
         this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
         this.executionAttemptID = 
Preconditions.checkNotNull(executionAttemptID);
@@ -58,7 +59,7 @@ public class RpcInputSplitProvider implements 
InputSplitProvider {
 
         try {
             SerializedInputSplit serializedInputSplit =
-                    futureInputSplit.get(timeout.getSize(), timeout.getUnit());
+                    futureInputSplit.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
 
             if (serializedInputSplit.isEmpty()) {
                 return null;

Reply via email to