This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a81609e2c82 [FLINK-27431][rpc] Allow RpcTimeouts to be specified as Duration a81609e2c82 is described below commit a81609e2c82d6012659e52a35bab881d810e016b Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Apr 28 08:45:32 2022 +0200 [FLINK-27431][rpc] Allow RpcTimeouts to be specified as Duration --- .../runtime/rpc/akka/AkkaInvocationHandler.java | 5 +++++ .../runtime/rpc/akka/TimeoutCallStackTest.java | 24 ++++++++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 5f9584d1443..acc5f4248cb 100644 --- a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -49,6 +49,7 @@ import java.io.IOException; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.time.Duration; import java.util.Arrays; import java.util.Objects; import java.util.concurrent.Callable; @@ -338,10 +339,14 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc if (isRpcTimeout(parameterAnnotations[i])) { if (args[i] instanceof Time) { return (Time) args[i]; + } else if (args[i] instanceof Duration) { + return Time.fromDuration((Duration) args[i]); } else { throw new RuntimeException( "The rpc timeout parameter must be of type " + Time.class.getName() + + " or " + + Duration.class.getName() + ". The type " + args[i].getClass().getName() + " is not supported."); diff --git a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java index 627edc34426..1349cb31ff0 100644 --- a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java +++ b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java @@ -34,12 +34,14 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -76,10 +78,20 @@ class TimeoutCallStackTest { } @Test - void testTimeoutException() throws Exception { + void testTimeoutExceptionWithTime() throws Exception { + testTimeoutException(gateway -> gateway.callThatTimesOut(Time.milliseconds(1))); + } + + @Test + void testTimeoutExceptionWithDuration() throws Exception { + testTimeoutException(gateway -> gateway.callThatTimesOut(Duration.ofMillis(1))); + } + + private void testTimeoutException( + Function<TestingGateway, CompletableFuture<Void>> timeoutOperation) throws Exception { final TestingGateway gateway = createTestingGateway(); - final CompletableFuture<Void> future = gateway.callThatTimesOut(Time.milliseconds(1)); + final CompletableFuture<Void> future = timeoutOperation.apply(gateway); assertThatThrownBy(future::get) .hasCauseInstanceOf(TimeoutException.class) @@ -108,6 +120,8 @@ class TimeoutCallStackTest { private interface TestingGateway extends RpcGateway { CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout); + + CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration timeout); } private static final class TestingRpcEndpoint extends RpcEndpoint implements TestingGateway { @@ -121,5 +135,11 @@ class TimeoutCallStackTest { // return a future that never completes, so the call is guaranteed to time out return new CompletableFuture<>(); } + + @Override + public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration timeout) { + // return a future that never completes, so the call is guaranteed to time out + return new CompletableFuture<>(); + } } }