This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e96cdf12d6ff42079b39f5ef8966258e60083b48 Author: gyao <g...@data-artisans.com> AuthorDate: Wed Oct 17 11:30:10 2018 +0200 [FLINK-10309][tests] Replace wait-notify with CountDownLatches. Replace the use of wait-notify with CountDownLatches in RestServerEndpointITCase. The wait() method can have spurious wakeups which was previously not considered. Also refactor the tests that use the TestHandler to avoid code duplication, and to separate concerns. --- .../runtime/rest/RestServerEndpointITCase.java | 215 ++++++++++++--------- 1 file changed, 122 insertions(+), 93 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java index 48492f8..0c28745 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java @@ -91,11 +91,14 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; @@ -267,37 +270,25 @@ public class RestServerEndpointITCase extends TestLogger { */ @Test public void testRequestInterleaving() throws Exception { - - TestParameters parameters = new TestParameters(); - parameters.jobIDPathParameter.resolve(PATH_JOB_ID); - parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID)); + final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout); + testHandler.handlerBody = id -> { + if (id == 1) { + handlerBlocker.arriveAndBlock(); + } + return CompletableFuture.completedFuture(new TestResponse(id)); + }; // send first request and wait until the handler blocks - CompletableFuture<TestResponse> response1; - - synchronized (TestHandler.LOCK) { - response1 = restClient.sendRequest( - serverAddress.getHostName(), - serverAddress.getPort(), - new TestHeaders(), - parameters, - new TestRequest(1)); - TestHandler.LOCK.wait(); - } + final CompletableFuture<TestResponse> response1 = sendRequestToTestHandler(new TestRequest(1)); + handlerBlocker.awaitRequestToArrive(); // send second request and verify response - CompletableFuture<TestResponse> response2 = restClient.sendRequest( - serverAddress.getHostName(), - serverAddress.getPort(), - new TestHeaders(), - parameters, - new TestRequest(2)); + final CompletableFuture<TestResponse> response2 = sendRequestToTestHandler(new TestRequest(2)); assertEquals(2, response2.get().id); // wake up blocked handler - synchronized (TestHandler.LOCK) { - TestHandler.LOCK.notifyAll(); - } + handlerBlocker.unblockRequest(); + // verify response to first request assertEquals(1, response1.get().id); } @@ -338,41 +329,34 @@ public class RestServerEndpointITCase extends TestLogger { } /** - * Tests that requests and responses larger than {@link #TEST_REST_MAX_CONTENT_LENGTH} - * are rejected by the server and client, respectively. + * Tests that requests larger than {@link #TEST_REST_MAX_CONTENT_LENGTH} are rejected. */ @Test - public void testMaxContentLengthLimit() throws Exception { - final TestParameters parameters = new TestParameters(); - parameters.jobIDPathParameter.resolve(PATH_JOB_ID); - parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID)); - - CompletableFuture<TestResponse> response; - response = restClient.sendRequest( - serverAddress.getHostName(), - serverAddress.getPort(), - new TestHeaders(), - parameters, - new TestRequest(2, createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH))); + public void testShouldRespectMaxContentLengthLimitForRequests() throws Exception { + testHandler.handlerBody = id -> { + throw new AssertionError("Request should not arrive at server."); + }; try { - response.get(); + sendRequestToTestHandler(new TestRequest(2, createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH))).get(); fail("Expected exception not thrown"); } catch (final ExecutionException e) { final Throwable throwable = ExceptionUtils.stripExecutionException(e); assertThat(throwable, instanceOf(RestClientException.class)); assertThat(throwable.getMessage(), containsString("Try to raise")); } + } - response = restClient.sendRequest( - serverAddress.getHostName(), - serverAddress.getPort(), - new TestHeaders(), - parameters, - new TestRequest(TestHandler.LARGE_RESPONSE_BODY_ID)); + /** + * Tests that responses larger than {@link #TEST_REST_MAX_CONTENT_LENGTH} are rejected. + */ + @Test + public void testShouldRespectMaxContentLengthLimitForResponses() throws Exception { + testHandler.handlerBody = id -> CompletableFuture.completedFuture( + new TestResponse(id, createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH))); try { - response.get(); + sendRequestToTestHandler(new TestRequest(1)).get(); fail("Expected exception not thrown"); } catch (final ExecutionException e) { final Throwable throwable = ExceptionUtils.stripExecutionException(e); @@ -555,37 +539,31 @@ public class RestServerEndpointITCase extends TestLogger { */ @Test public void testShouldWaitForHandlersWhenClosing() throws Exception { - final CompletableFuture<Void> closeHandlerFuture = new CompletableFuture<>(); - testHandler.closeFuture = closeHandlerFuture; - - // Initiate closing RestServerEndpoint but the handler should block. + testHandler.closeFuture = new CompletableFuture<>(); + final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout); + testHandler.handlerBody = id -> { + // Intentionally schedule the work on a different thread. This is to simulate + // handlers where the CompletableFuture is finished by the RPC framework. + return CompletableFuture.supplyAsync(() -> { + handlerBlocker.arriveAndBlock(); + return new TestResponse(id); + }); + }; + + // Initiate closing RestServerEndpoint but the test handler should block. final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync(); assertThat(closeRestServerEndpointFuture.isDone(), is(false)); - final TestParameters parameters = new TestParameters(); - parameters.jobIDPathParameter.resolve(PATH_JOB_ID); - parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID)); - - final CompletableFuture<TestResponse> request; - synchronized (TestHandler.LOCK) { - request = restClient.sendRequest( - serverAddress.getHostName(), - serverAddress.getPort(), - new TestHeaders(), - parameters, - new TestRequest(1)); - TestHandler.LOCK.wait(); - } + final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1)); + handlerBlocker.awaitRequestToArrive(); // Allow handler to close but there is still one in-flight request which should prevent // the RestServerEndpoint from closing. - closeHandlerFuture.complete(null); + testHandler.closeFuture.complete(null); assertThat(closeRestServerEndpointFuture.isDone(), is(false)); // Finish the in-flight request. - synchronized (TestHandler.LOCK) { - TestHandler.LOCK.notifyAll(); - } + handlerBlocker.unblockRequest(); request.get(timeout.getSize(), timeout.getUnit()); closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit()); @@ -633,12 +611,10 @@ public class RestServerEndpointITCase extends TestLogger { private static class TestHandler extends AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> { - private static final Object LOCK = new Object(); - - private static final int LARGE_RESPONSE_BODY_ID = 3; - private CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null); + private Function<Integer, CompletableFuture<TestResponse>> handlerBody; + TestHandler( CompletableFuture<String> localAddressFuture, GatewayRetriever<RestfulGateway> leaderRetriever, @@ -652,31 +628,12 @@ public class RestServerEndpointITCase extends TestLogger { } @Override - protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, RestfulGateway gateway) throws RestHandlerException { + protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, RestfulGateway gateway) { assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID); assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0), QUERY_JOB_ID); final int id = request.getRequestBody().id; - if (id == 1) { - // Intentionally schedule the work on a different thread. This is to simulate - // handlers where the CompletableFuture is finished by the RPC framework. - return CompletableFuture.supplyAsync(() -> { - synchronized (LOCK) { - try { - LOCK.notifyAll(); - LOCK.wait(); - } catch (InterruptedException ignored) { - } - } - return new TestResponse(id); - }); - } else if (id == LARGE_RESPONSE_BODY_ID) { - return CompletableFuture.completedFuture(new TestResponse( - id, - createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH))); - } else { - return CompletableFuture.completedFuture(new TestResponse(id)); - } + return handlerBody.apply(id); } @Override @@ -685,6 +642,78 @@ public class RestServerEndpointITCase extends TestLogger { } } + private CompletableFuture<TestResponse> sendRequestToTestHandler(final TestRequest testRequest) { + try { + return restClient.sendRequest( + serverAddress.getHostName(), + serverAddress.getPort(), + new TestHeaders(), + createTestParameters(), + testRequest); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + private static TestParameters createTestParameters() { + final TestParameters parameters = new TestParameters(); + parameters.jobIDPathParameter.resolve(PATH_JOB_ID); + parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID)); + return parameters; + } + + /** + * This is a helper class for tests that require to have fine-grained control over HTTP + * requests so that they are not dispatched immediately. + */ + private static class HandlerBlocker { + + private final Time timeout; + + private final CountDownLatch requestArrivedLatch = new CountDownLatch(1); + + private final CountDownLatch finishRequestLatch = new CountDownLatch(1); + + private HandlerBlocker(final Time timeout) { + this.timeout = checkNotNull(timeout); + } + + /** + * Waits until {@link #arriveAndBlock()} is called. + */ + public void awaitRequestToArrive() { + try { + assertTrue(requestArrivedLatch.await(timeout.getSize(), timeout.getUnit())); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * Signals that the request arrived. This method blocks until {@link #unblockRequest()} is + * called. + */ + public void arriveAndBlock() { + markRequestArrived(); + try { + assertTrue(finishRequestLatch.await(timeout.getSize(), timeout.getUnit())); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * @see #arriveAndBlock() + */ + public void unblockRequest() { + finishRequestLatch.countDown(); + } + + private void markRequestArrived() { + requestArrivedLatch.countDown(); + } + } + static class TestRestClient extends RestClient { TestRestClient(RestClientConfiguration configuration) {