This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e96cdf12d6ff42079b39f5ef8966258e60083b48
Author: gyao <g...@data-artisans.com>
AuthorDate: Wed Oct 17 11:30:10 2018 +0200

    [FLINK-10309][tests] Replace wait-notify with CountDownLatches.
    
    Replace the use of wait-notify with CountDownLatches in
    RestServerEndpointITCase. The wait() method can have spurious wakeups which 
was
    previously not considered. Also refactor the tests that use the TestHandler 
to
    avoid code duplication, and to separate concerns.
---
 .../runtime/rest/RestServerEndpointITCase.java     | 215 ++++++++++++---------
 1 file changed, 122 insertions(+), 93 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 48492f8..0c28745 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -91,11 +91,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
@@ -267,37 +270,25 @@ public class RestServerEndpointITCase extends TestLogger {
         */
        @Test
        public void testRequestInterleaving() throws Exception {
-
-               TestParameters parameters = new TestParameters();
-               parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-               
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+               final HandlerBlocker handlerBlocker = new 
HandlerBlocker(timeout);
+               testHandler.handlerBody = id -> {
+                       if (id == 1) {
+                               handlerBlocker.arriveAndBlock();
+                       }
+                       return CompletableFuture.completedFuture(new 
TestResponse(id));
+               };
 
                // send first request and wait until the handler blocks
-               CompletableFuture<TestResponse> response1;
-
-               synchronized (TestHandler.LOCK) {
-                       response1 = restClient.sendRequest(
-                               serverAddress.getHostName(),
-                               serverAddress.getPort(),
-                               new TestHeaders(),
-                               parameters,
-                               new TestRequest(1));
-                       TestHandler.LOCK.wait();
-               }
+               final CompletableFuture<TestResponse> response1 = 
sendRequestToTestHandler(new TestRequest(1));
+               handlerBlocker.awaitRequestToArrive();
 
                // send second request and verify response
-               CompletableFuture<TestResponse> response2 = 
restClient.sendRequest(
-                       serverAddress.getHostName(),
-                       serverAddress.getPort(),
-                       new TestHeaders(),
-                       parameters,
-                       new TestRequest(2));
+               final CompletableFuture<TestResponse> response2 = 
sendRequestToTestHandler(new TestRequest(2));
                assertEquals(2, response2.get().id);
 
                // wake up blocked handler
-               synchronized (TestHandler.LOCK) {
-                       TestHandler.LOCK.notifyAll();
-               }
+               handlerBlocker.unblockRequest();
+
                // verify response to first request
                assertEquals(1, response1.get().id);
        }
@@ -338,41 +329,34 @@ public class RestServerEndpointITCase extends TestLogger {
        }
 
        /**
-        * Tests that requests and responses larger than {@link 
#TEST_REST_MAX_CONTENT_LENGTH}
-        * are rejected by the server and client, respectively.
+        * Tests that requests larger than {@link 
#TEST_REST_MAX_CONTENT_LENGTH} are rejected.
         */
        @Test
-       public void testMaxContentLengthLimit() throws Exception {
-               final TestParameters parameters = new TestParameters();
-               parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-               
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
-
-               CompletableFuture<TestResponse> response;
-               response = restClient.sendRequest(
-                       serverAddress.getHostName(),
-                       serverAddress.getPort(),
-                       new TestHeaders(),
-                       parameters,
-                       new TestRequest(2, 
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
+       public void testShouldRespectMaxContentLengthLimitForRequests() throws 
Exception {
+               testHandler.handlerBody = id -> {
+                       throw new AssertionError("Request should not arrive at 
server.");
+               };
 
                try {
-                       response.get();
+                       sendRequestToTestHandler(new TestRequest(2, 
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH))).get();
                        fail("Expected exception not thrown");
                } catch (final ExecutionException e) {
                        final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
                        assertThat(throwable, 
instanceOf(RestClientException.class));
                        assertThat(throwable.getMessage(), containsString("Try 
to raise"));
                }
+       }
 
-               response = restClient.sendRequest(
-                       serverAddress.getHostName(),
-                       serverAddress.getPort(),
-                       new TestHeaders(),
-                       parameters,
-                       new TestRequest(TestHandler.LARGE_RESPONSE_BODY_ID));
+       /**
+        * Tests that responses larger than {@link 
#TEST_REST_MAX_CONTENT_LENGTH} are rejected.
+        */
+       @Test
+       public void testShouldRespectMaxContentLengthLimitForResponses() throws 
Exception {
+               testHandler.handlerBody = id -> 
CompletableFuture.completedFuture(
+                       new TestResponse(id, 
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
 
                try {
-                       response.get();
+                       sendRequestToTestHandler(new TestRequest(1)).get();
                        fail("Expected exception not thrown");
                } catch (final ExecutionException e) {
                        final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
@@ -555,37 +539,31 @@ public class RestServerEndpointITCase extends TestLogger {
         */
        @Test
        public void testShouldWaitForHandlersWhenClosing() throws Exception {
-               final CompletableFuture<Void> closeHandlerFuture = new 
CompletableFuture<>();
-               testHandler.closeFuture = closeHandlerFuture;
-
-               // Initiate closing RestServerEndpoint but the handler should 
block.
+               testHandler.closeFuture = new CompletableFuture<>();
+               final HandlerBlocker handlerBlocker = new 
HandlerBlocker(timeout);
+               testHandler.handlerBody = id -> {
+                       // Intentionally schedule the work on a different 
thread. This is to simulate
+                       // handlers where the CompletableFuture is finished by 
the RPC framework.
+                       return CompletableFuture.supplyAsync(() -> {
+                               handlerBlocker.arriveAndBlock();
+                               return new TestResponse(id);
+                       });
+               };
+
+               // Initiate closing RestServerEndpoint but the test handler 
should block.
                final CompletableFuture<Void> closeRestServerEndpointFuture = 
serverEndpoint.closeAsync();
                assertThat(closeRestServerEndpointFuture.isDone(), is(false));
 
-               final TestParameters parameters = new TestParameters();
-               parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-               
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
-
-               final CompletableFuture<TestResponse> request;
-               synchronized (TestHandler.LOCK) {
-                       request = restClient.sendRequest(
-                               serverAddress.getHostName(),
-                               serverAddress.getPort(),
-                               new TestHeaders(),
-                               parameters,
-                               new TestRequest(1));
-                       TestHandler.LOCK.wait();
-               }
+               final CompletableFuture<TestResponse> request = 
sendRequestToTestHandler(new TestRequest(1));
+               handlerBlocker.awaitRequestToArrive();
 
                // Allow handler to close but there is still one in-flight 
request which should prevent
                // the RestServerEndpoint from closing.
-               closeHandlerFuture.complete(null);
+               testHandler.closeFuture.complete(null);
                assertThat(closeRestServerEndpointFuture.isDone(), is(false));
 
                // Finish the in-flight request.
-               synchronized (TestHandler.LOCK) {
-                       TestHandler.LOCK.notifyAll();
-               }
+               handlerBlocker.unblockRequest();
 
                request.get(timeout.getSize(), timeout.getUnit());
                closeRestServerEndpointFuture.get(timeout.getSize(), 
timeout.getUnit());
@@ -633,12 +611,10 @@ public class RestServerEndpointITCase extends TestLogger {
 
        private static class TestHandler extends 
AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
 
-               private static final Object LOCK = new Object();
-
-               private static final int LARGE_RESPONSE_BODY_ID = 3;
-
                private CompletableFuture<Void> closeFuture = 
CompletableFuture.completedFuture(null);
 
+               private Function<Integer, CompletableFuture<TestResponse>> 
handlerBody;
+
                TestHandler(
                                CompletableFuture<String> localAddressFuture,
                                GatewayRetriever<RestfulGateway> 
leaderRetriever,
@@ -652,31 +628,12 @@ public class RestServerEndpointITCase extends TestLogger {
                }
 
                @Override
-               protected CompletableFuture<TestResponse> 
handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, 
RestfulGateway gateway) throws RestHandlerException {
+               protected CompletableFuture<TestResponse> 
handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, 
RestfulGateway gateway) {
                        
assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID);
                        
assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0), 
QUERY_JOB_ID);
 
                        final int id = request.getRequestBody().id;
-                       if (id == 1) {
-                               // Intentionally schedule the work on a 
different thread. This is to simulate
-                               // handlers where the CompletableFuture is 
finished by the RPC framework.
-                               return CompletableFuture.supplyAsync(() -> {
-                                       synchronized (LOCK) {
-                                               try {
-                                                       LOCK.notifyAll();
-                                                       LOCK.wait();
-                                               } catch (InterruptedException 
ignored) {
-                                               }
-                                       }
-                                       return new TestResponse(id);
-                               });
-                       } else if (id == LARGE_RESPONSE_BODY_ID) {
-                               return CompletableFuture.completedFuture(new 
TestResponse(
-                                       id,
-                                       
createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
-                       } else {
-                               return CompletableFuture.completedFuture(new 
TestResponse(id));
-                       }
+                       return handlerBody.apply(id);
                }
 
                @Override
@@ -685,6 +642,78 @@ public class RestServerEndpointITCase extends TestLogger {
                }
        }
 
+       private CompletableFuture<TestResponse> sendRequestToTestHandler(final 
TestRequest testRequest) {
+               try {
+                       return restClient.sendRequest(
+                               serverAddress.getHostName(),
+                               serverAddress.getPort(),
+                               new TestHeaders(),
+                               createTestParameters(),
+                               testRequest);
+               } catch (final IOException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       private static TestParameters createTestParameters() {
+               final TestParameters parameters = new TestParameters();
+               parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+               
parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+               return parameters;
+       }
+
+       /**
+        * This is a helper class for tests that require to have fine-grained 
control over HTTP
+        * requests so that they are not dispatched immediately.
+        */
+       private static class HandlerBlocker {
+
+               private final Time timeout;
+
+               private final CountDownLatch requestArrivedLatch = new 
CountDownLatch(1);
+
+               private final CountDownLatch finishRequestLatch = new 
CountDownLatch(1);
+
+               private HandlerBlocker(final Time timeout) {
+                       this.timeout = checkNotNull(timeout);
+               }
+
+               /**
+                * Waits until {@link #arriveAndBlock()} is called.
+                */
+               public void awaitRequestToArrive() {
+                       try {
+                               
assertTrue(requestArrivedLatch.await(timeout.getSize(), timeout.getUnit()));
+                       } catch (final InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                       }
+               }
+
+               /**
+                * Signals that the request arrived. This method blocks until 
{@link #unblockRequest()} is
+                * called.
+                */
+               public void arriveAndBlock() {
+                       markRequestArrived();
+                       try {
+                               
assertTrue(finishRequestLatch.await(timeout.getSize(), timeout.getUnit()));
+                       } catch (final InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                       }
+               }
+
+               /**
+                * @see #arriveAndBlock()
+                */
+               public void unblockRequest() {
+                       finishRequestLatch.countDown();
+               }
+
+               private void markRequestArrived() {
+                       requestArrivedLatch.countDown();
+               }
+       }
+
        static class TestRestClient extends RestClient {
 
                TestRestClient(RestClientConfiguration configuration) {

Reply via email to