This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a81609e2c82 [FLINK-27431][rpc] Allow RpcTimeouts to be specified as 
Duration
a81609e2c82 is described below

commit a81609e2c82d6012659e52a35bab881d810e016b
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu Apr 28 08:45:32 2022 +0200

    [FLINK-27431][rpc] Allow RpcTimeouts to be specified as Duration
---
 .../runtime/rpc/akka/AkkaInvocationHandler.java    |  5 +++++
 .../runtime/rpc/akka/TimeoutCallStackTest.java     | 24 ++++++++++++++++++++--
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 5f9584d1443..acc5f4248cb 100644
--- 
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -49,6 +49,7 @@ import java.io.IOException;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.Callable;
@@ -338,10 +339,14 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaBasedEndpoint, Rpc
                 if (isRpcTimeout(parameterAnnotations[i])) {
                     if (args[i] instanceof Time) {
                         return (Time) args[i];
+                    } else if (args[i] instanceof Duration) {
+                        return Time.fromDuration((Duration) args[i]);
                     } else {
                         throw new RuntimeException(
                                 "The rpc timeout parameter must be of type "
                                         + Time.class.getName()
+                                        + " or "
+                                        + Duration.class.getName()
                                         + ". The type "
                                         + args[i].getClass().getName()
                                         + " is not supported.");
diff --git 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java
 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java
index 627edc34426..1349cb31ff0 100644
--- 
a/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java
+++ 
b/flink-rpc/flink-rpc-akka/src/test/java/org/apache/flink/runtime/rpc/akka/TimeoutCallStackTest.java
@@ -34,12 +34,14 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -76,10 +78,20 @@ class TimeoutCallStackTest {
     }
 
     @Test
-    void testTimeoutException() throws Exception {
+    void testTimeoutExceptionWithTime() throws Exception {
+        testTimeoutException(gateway -> 
gateway.callThatTimesOut(Time.milliseconds(1)));
+    }
+
+    @Test
+    void testTimeoutExceptionWithDuration() throws Exception {
+        testTimeoutException(gateway -> 
gateway.callThatTimesOut(Duration.ofMillis(1)));
+    }
+
+    private void testTimeoutException(
+            Function<TestingGateway, CompletableFuture<Void>> 
timeoutOperation) throws Exception {
         final TestingGateway gateway = createTestingGateway();
 
-        final CompletableFuture<Void> future = 
gateway.callThatTimesOut(Time.milliseconds(1));
+        final CompletableFuture<Void> future = timeoutOperation.apply(gateway);
 
         assertThatThrownBy(future::get)
                 .hasCauseInstanceOf(TimeoutException.class)
@@ -108,6 +120,8 @@ class TimeoutCallStackTest {
     private interface TestingGateway extends RpcGateway {
 
         CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout);
+
+        CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration timeout);
     }
 
     private static final class TestingRpcEndpoint extends RpcEndpoint 
implements TestingGateway {
@@ -121,5 +135,11 @@ class TimeoutCallStackTest {
             // return a future that never completes, so the call is guaranteed 
to time out
             return new CompletableFuture<>();
         }
+
+        @Override
+        public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration 
timeout) {
+            // return a future that never completes, so the call is guaranteed 
to time out
+            return new CompletableFuture<>();
+        }
     }
 }

Reply via email to