This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 22c1eb44df7 [FLINK-33344][rpc] Replace Time with Duration in RpcInputSplitProvider 22c1eb44df7 is described below commit 22c1eb44df7226c8e9045789e45c71c93668c644 Author: Jiabao Sun <jiabao....@xtransfer.cn> AuthorDate: Tue Oct 24 12:05:02 2023 +0800 [FLINK-33344][rpc] Replace Time with Duration in RpcInputSplitProvider --- .../java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java | 4 ++-- .../flink/runtime/taskexecutor/TaskManagerConfiguration.java | 9 ++++----- .../flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java | 9 +++++---- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index d42a5658598..680514f09a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -1470,7 +1470,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { getResourceID(), taskExecutorRegistrationId, taskSlotTable.createSlotReport(getResourceID()), - taskManagerConfiguration.getRpcTimeout()); + Time.fromDuration(taskManagerConfiguration.getRpcTimeout())); slotReportResponseFuture.whenCompleteAsync( (acknowledge, throwable) -> { @@ -1610,7 +1610,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { jobMasterGateway.offerSlots( getResourceID(), reservedSlots, - taskManagerConfiguration.getRpcTimeout()); + Time.fromDuration(taskManagerConfiguration.getRpcTimeout())); acceptedSlotsFuture.whenCompleteAsync( handleAcceptedSlotOffers( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index 2ec461b40cd..c3c76220c10 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -51,7 +51,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { private final String[] tmpDirectories; - private final Time rpcTimeout; + private final Duration rpcTimeout; private final Time slotTimeout; @@ -79,7 +79,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { ResourceProfile defaultSlotResourceProfile, ResourceProfile totalResourceProfile, String[] tmpDirectories, - Time rpcTimeout, + Duration rpcTimeout, Time slotTimeout, @Nullable Duration maxRegistrationDuration, Configuration configuration, @@ -121,7 +121,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { return totalResourceProfile; } - public Time getRpcTimeout() { + public Duration getRpcTimeout() { return rpcTimeout; } @@ -195,8 +195,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration); - final Time rpcTimeout = - Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)); + final Duration rpcTimeout = configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION); LOG.debug("Messages have a max timeout of " + rpcTimeout); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java index 31b4ae67ec8..0afdf05f0eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.taskexecutor.rpc; -import org.apache.flink.api.common.time.Time; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -29,19 +28,21 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; +import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; public class RpcInputSplitProvider implements InputSplitProvider { private final JobMasterGateway jobMasterGateway; private final JobVertexID jobVertexID; private final ExecutionAttemptID executionAttemptID; - private final Time timeout; + private final Duration timeout; public RpcInputSplitProvider( JobMasterGateway jobMasterGateway, JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID, - Time timeout) { + Duration timeout) { this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); this.jobVertexID = Preconditions.checkNotNull(jobVertexID); this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID); @@ -58,7 +59,7 @@ public class RpcInputSplitProvider implements InputSplitProvider { try { SerializedInputSplit serializedInputSplit = - futureInputSplit.get(timeout.getSize(), timeout.getUnit()); + futureInputSplit.get(timeout.toMillis(), TimeUnit.MILLISECONDS); if (serializedInputSplit.isEmpty()) { return null;