This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b48cddf12a3 [FLINK-30416][sql-gateway] Add configureSession REST API in the SQL Gateway (#21525) b48cddf12a3 is described below commit b48cddf12a3171cf1b6ddaa675a6b81eea1254bf Author: yuzelin <33053040+yuze...@users.noreply.github.com> AuthorDate: Wed Jan 4 16:21:27 2023 +0800 [FLINK-30416][sql-gateway] Add configureSession REST API in the SQL Gateway (#21525) This closes #21525 --- .../table/gateway/rest/SqlGatewayRestEndpoint.java | 8 + .../handler/session/ConfigureSessionHandler.java | 65 ++++ .../rest/header/SqlGatewayMessageHeaders.java | 7 +- .../header/session/ConfigureSessionHeaders.java | 101 ++++++ .../session/ConfigureSessionRequestBody.java | 57 ++++ .../rest/util/SqlGatewayRestAPIVersion.java | 31 +- ...CaseITTest.java => OperationRelatedITCase.java} | 2 +- ...stAPIITTestBase.java => RestAPIITCaseBase.java} | 23 +- ...onCaseITTest.java => SessionRelatedITCase.java} | 71 ++-- .../gateway/rest/SqlGatewayRestEndpointITCase.java | 339 +++++++++---------- .../SqlGatewayRestEndpointStatementITCase.java | 32 +- .../rest/{UtilCaseITTest.java => UtilITCase.java} | 2 +- .../table/gateway/rest/util/TestingRestClient.java | 51 +++ .../{ => util}/TestingSqlGatewayRestEndpoint.java | 16 +- .../resources/sql_gateway_rest_api_v2.snapshot | 357 +++++++++++++++++++++ 15 files changed, 933 insertions(+), 229 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java index 41afe0709f3..3d063d789ea 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java @@ -28,6 +28,7 @@ import org.apache.flink.table.gateway.rest.handler.operation.CancelOperationHand import org.apache.flink.table.gateway.rest.handler.operation.CloseOperationHandler; import org.apache.flink.table.gateway.rest.handler.operation.GetOperationStatusHandler; import org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler; +import org.apache.flink.table.gateway.rest.handler.session.ConfigureSessionHandler; import org.apache.flink.table.gateway.rest.handler.session.GetSessionConfigHandler; import org.apache.flink.table.gateway.rest.handler.session.OpenSessionHandler; import org.apache.flink.table.gateway.rest.handler.session.TriggerSessionHeartbeatHandler; @@ -39,6 +40,7 @@ import org.apache.flink.table.gateway.rest.header.operation.CancelOperationHeade import org.apache.flink.table.gateway.rest.header.operation.CloseOperationHeaders; import org.apache.flink.table.gateway.rest.header.operation.GetOperationStatusHeaders; import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders; +import org.apache.flink.table.gateway.rest.header.session.ConfigureSessionHeaders; import org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders; import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders; import org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders; @@ -90,6 +92,12 @@ public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGat service, responseHeaders, CloseSessionHeaders.getInstance()); handlers.add(Tuple2.of(CloseSessionHeaders.getInstance(), closeSessionHandler)); + // Configure session + ConfigureSessionHandler configureSessionHandler = + new ConfigureSessionHandler( + service, responseHeaders, ConfigureSessionHeaders.getINSTANCE()); + handlers.add(Tuple2.of(ConfigureSessionHeaders.getINSTANCE(), configureSessionHandler)); + // Get session configuration GetSessionConfigHandler getSessionConfigHandler = new GetSessionConfigHandler( diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/session/ConfigureSessionHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/session/ConfigureSessionHandler.java new file mode 100644 index 00000000000..0b69e73ccde --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/session/ConfigureSessionHandler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.handler.session; + +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.table.gateway.api.SqlGatewayService; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler; +import org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody; +import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter; +import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Handler to configure a session with statement. */ +public class ConfigureSessionHandler + extends AbstractSqlGatewayRestHandler< + ConfigureSessionRequestBody, EmptyResponseBody, SessionMessageParameters> { + + public ConfigureSessionHandler( + SqlGatewayService service, + Map<String, String> responseHeaders, + MessageHeaders<ConfigureSessionRequestBody, EmptyResponseBody, SessionMessageParameters> + messageHeaders) { + super(service, responseHeaders, messageHeaders); + } + + @Override + protected CompletableFuture<EmptyResponseBody> handleRequest( + SqlGatewayRestAPIVersion version, + @Nonnull HandlerRequest<ConfigureSessionRequestBody> request) + throws RestHandlerException { + SessionHandle sessionHandle = request.getPathParameter(SessionHandleIdPathParameter.class); + String statement = request.getRequestBody().getStatement(); + Long timeout = request.getRequestBody().getTimeout(); + timeout = timeout == null ? 0L : timeout; + + service.configureSession(sessionHandle, statement, timeout); + + return CompletableFuture.completedFuture(EmptyResponseBody.getInstance()); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java index 92d35d164ba..8fbf3db4c11 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java @@ -25,8 +25,9 @@ import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; +import java.util.stream.Collectors; /** * This class links {@link RequestBody}s to {@link ResponseBody}s types and contains meta-data @@ -44,6 +45,8 @@ public interface SqlGatewayMessageHeaders< @Override default Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() { - return Collections.singleton(SqlGatewayRestAPIVersion.V1); + return Arrays.stream(SqlGatewayRestAPIVersion.values()) + .filter(SqlGatewayRestAPIVersion::isStableVersion) + .collect(Collectors.toList()); } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/session/ConfigureSessionHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/session/ConfigureSessionHeaders.java new file mode 100644 index 00000000000..a11b65b09cf --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/session/ConfigureSessionHeaders.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.header.session; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; +import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders; +import org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody; +import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter; +import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Collection; +import java.util.Collections; + +/** Message headers for configuring a session. */ +public class ConfigureSessionHeaders + implements SqlGatewayMessageHeaders< + ConfigureSessionRequestBody, EmptyResponseBody, SessionMessageParameters> { + + private static final ConfigureSessionHeaders INSTANCE = new ConfigureSessionHeaders(); + + private static final String URL = + "/sessions/:" + SessionHandleIdPathParameter.KEY + "/configure-session"; + + @Override + public Class<EmptyResponseBody> getResponseClass() { + return EmptyResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Configures the session with the statement which could be:\n" + + "CREATE TABLE, DROP TABLE, ALTER TABLE, " + + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " + + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, " + + "CREATE CATALOG, DROP CATALOG, " + + "USE CATALOG, USE [CATALOG.]DATABASE, " + + "CREATE VIEW, DROP VIEW, " + + "LOAD MODULE, UNLOAD MODULE, USE MODULE, " + + "ADD JAR."; + } + + @Override + public Class<ConfigureSessionRequestBody> getRequestClass() { + return ConfigureSessionRequestBody.class; + } + + @Override + public SessionMessageParameters getUnresolvedMessageParameters() { + return new SessionMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.POST; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + @Override + public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() { + return Collections.singleton(SqlGatewayRestAPIVersion.V2); + } + + public static ConfigureSessionHeaders getINSTANCE() { + return INSTANCE; + } + + @Override + public String operationId() { + return "configureSession"; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java new file mode 100644 index 00000000000..4b16ffa1413 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.message.session; + +import org.apache.flink.runtime.rest.messages.RequestBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +/** {@link RequestBody} for configuring a session. */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class ConfigureSessionRequestBody implements RequestBody { + + private static final String FIELD_NAME_STATEMENT = "statement"; + private static final String FIELD_NAME_EXECUTION_TIMEOUT = "executionTimeout"; + + @JsonProperty(FIELD_NAME_STATEMENT) + private final String statement; + + @JsonProperty(FIELD_NAME_EXECUTION_TIMEOUT) + @Nullable + private final Long timeout; + + public ConfigureSessionRequestBody( + @JsonProperty(FIELD_NAME_STATEMENT) String statement, + @Nullable @JsonProperty(FIELD_NAME_EXECUTION_TIMEOUT) Long timeout) { + this.statement = statement; + this.timeout = timeout; + } + + public String getStatement() { + return statement; + } + + @Nullable + public Long getTimeout() { + return timeout; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java index cc5706f5d12..2b84aef323a 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java @@ -20,6 +20,11 @@ package org.apache.flink.table.gateway.rest.util; import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.table.gateway.api.endpoint.EndpointVersion; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; /** * An enum for all versions of the Sql Gateway REST API. @@ -34,8 +39,16 @@ public enum SqlGatewayRestAPIVersion implements RestAPIVersion<SqlGatewayRestAPIVersion>, EndpointVersion { // The bigger the ordinal(its position in enum declaration), the higher the level of the // version. + + // V0 is just for test V0(false, false), - V1(true, true); + + // V1 introduces basic APIs for rest endpoint + V1(false, true), + + // V2 adds support for configuring Session and allows to serialize the RowData with PLAIN_TEXT + // or JSON format. + V2(true, true); private final boolean isDefaultVersion; @@ -90,7 +103,21 @@ public enum SqlGatewayRestAPIVersion try { return valueOf(uri.substring(1, slashIndex).toUpperCase()); } catch (Exception e) { - return V1; + return getDefaultVersion(); } } + + public static SqlGatewayRestAPIVersion getDefaultVersion() { + List<SqlGatewayRestAPIVersion> versions = + Arrays.stream(SqlGatewayRestAPIVersion.values()) + .filter(SqlGatewayRestAPIVersion::isDefaultVersion) + .collect(Collectors.toList()); + Preconditions.checkState( + versions.size() == 1, + String.format( + "Only one default version of Sql Gateway Rest API, but found %s.", + versions.size())); + + return versions.get(0); + } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationCaseITTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java similarity index 99% rename from flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationCaseITTest.java rename to flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java index ab4c1803294..b5da70444e5 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationCaseITTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java @@ -52,7 +52,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; * Test basic logic of handlers inherited from {@link AbstractSqlGatewayRestHandler} in operation * related cases. */ -class OperationCaseITTest extends RestAPIITTestBase { +class OperationRelatedITCase extends RestAPIITCaseBase { private static final String sessionName = "test"; private static final Map<String, String> properties = new HashMap<>(); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITTestBase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java similarity index 81% rename from flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITTestBase.java rename to flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java index ceef72b3340..2970d2f25a1 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITTestBase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java @@ -20,36 +20,33 @@ package org.apache.flink.table.gateway.rest; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.table.gateway.rest.util.TestingRestClient; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; import org.apache.flink.test.junit5.MiniClusterExtension; -import org.apache.flink.util.ExecutorUtils; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; -import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.extension.RegisterExtension; +import javax.annotation.Nullable; + import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import static org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig; import static org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig; +import static org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient; import static org.apache.flink.util.Preconditions.checkNotNull; /** The base class for Rest API IT test. */ -abstract class RestAPIITTestBase { +abstract class RestAPIITCaseBase { @RegisterExtension @Order(1) @@ -60,9 +57,8 @@ abstract class RestAPIITTestBase { protected static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration); - @Nullable private static RestClient restClient = null; + @Nullable private static TestingRestClient restClient = null; @Nullable private static String targetAddress = null; - @Nullable private static ExecutorService executorService = null; @Nullable private static SqlGatewayRestEndpoint sqlGatewayRestEndpoint = null; private static int port = 0; @@ -75,10 +71,7 @@ abstract class RestAPIITTestBase { new SqlGatewayRestEndpoint(config, SQL_GATEWAY_SERVICE_EXTENSION.getService()); sqlGatewayRestEndpoint.start(); InetSocketAddress serverAddress = checkNotNull(sqlGatewayRestEndpoint.getServerAddress()); - executorService = - Executors.newFixedThreadPool( - 1, new ExecutorThreadFactory("rest-client-thread-pool")); - restClient = new RestClient(new Configuration(), executorService); + restClient = getTestingRestClient(); targetAddress = serverAddress.getHostName(); port = serverAddress.getPort(); } @@ -89,8 +82,6 @@ abstract class RestAPIITTestBase { sqlGatewayRestEndpoint.close(); checkNotNull(restClient); restClient.shutdown(Time.seconds(3)); - checkNotNull(executorService); - ExecutorUtils.gracefulShutdown(3, TimeUnit.SECONDS, executorService); } public < diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionCaseITTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionRelatedITCase.java similarity index 80% rename from flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionCaseITTest.java rename to flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionRelatedITCase.java index afc27904911..2cef32f269f 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionCaseITTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionRelatedITCase.java @@ -25,16 +25,20 @@ import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler; import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders; +import org.apache.flink.table.gateway.rest.header.session.ConfigureSessionHeaders; import org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders; import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders; import org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders; import org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody; +import org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody; import org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody; import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody; import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody; import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; import org.apache.flink.table.gateway.service.session.Session; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -50,13 +54,12 @@ import java.util.concurrent.ExecutionException; import static org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertNotNull; /** * Test basic logic of handlers inherited from {@link AbstractSqlGatewayRestHandler} in session * related cases. */ -class SessionCaseITTest extends RestAPIITTestBase { +class SessionRelatedITCase extends RestAPIITCaseBase { private static final String SESSION_NAME = "test"; private static final Map<String, String> properties = new HashMap<>(); @@ -77,6 +80,33 @@ class SessionCaseITTest extends RestAPIITTestBase { CloseSessionHeaders.getInstance(); private static final EmptyRequestBody emptyRequestBody = EmptyRequestBody.getInstance(); + private SessionHandle sessionHandle; + + private SessionMessageParameters sessionMessageParameters; + + @BeforeEach + public void setUp() throws Exception { + CompletableFuture<OpenSessionResponseBody> response = + sendRequest(openSessionHeaders, emptyParameters, openSessionRequestBody); + String sessionHandleId = response.get().getSessionHandle(); + assertThat(sessionHandleId).isNotNull(); + + sessionHandle = new SessionHandle(UUID.fromString(sessionHandleId)); + assertThat(SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(sessionHandle)) + .isNotNull(); + + sessionMessageParameters = new SessionMessageParameters(sessionHandle); + } + + @AfterEach + public void cleanUp() throws Exception { + CompletableFuture<CloseSessionResponseBody> response = + sendRequest(closeSessionHeaders, sessionMessageParameters, emptyRequestBody); + + String status = response.get().getStatus(); + assertThat(status).isEqualTo(CLOSE_MESSAGE); + } + @Test void testCreateAndCloseSessions() throws Exception { List<SessionHandle> sessionHandles = new ArrayList<>(); @@ -117,16 +147,6 @@ class SessionCaseITTest extends RestAPIITTestBase { @Test void testGetSessionConfiguration() throws Exception { - CompletableFuture<OpenSessionResponseBody> response = - sendRequest(openSessionHeaders, emptyParameters, openSessionRequestBody); - String sessionHandleId = response.get().getSessionHandle(); - assertThat(sessionHandleId).isNotNull(); - SessionHandle sessionHandle = new SessionHandle(UUID.fromString(sessionHandleId)); - assertThat(SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(sessionHandle)) - .isNotNull(); - - SessionMessageParameters sessionMessageParameters = - new SessionMessageParameters(sessionHandle); CompletableFuture<GetSessionConfigResponseBody> future = sendRequest( GetSessionConfigHeaders.getInstance(), @@ -140,19 +160,12 @@ class SessionCaseITTest extends RestAPIITTestBase { @Test void testTouchSession() throws Exception { - CompletableFuture<OpenSessionResponseBody> response = - sendRequest(openSessionHeaders, emptyParameters, openSessionRequestBody); - String sessionHandleId = response.get().getSessionHandle(); - assertNotNull(sessionHandleId); - SessionHandle sessionHandle = new SessionHandle(UUID.fromString(sessionHandleId)); Session session = SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(sessionHandle); assertThat(session).isNotNull(); long lastAccessTime = session.getLastAccessTime(); - SessionMessageParameters sessionMessageParameters = - new SessionMessageParameters(sessionHandle); CompletableFuture<EmptyResponseBody> future = sendRequest( TriggerSessionHeartbeatHeaders.getInstance(), @@ -161,4 +174,24 @@ class SessionCaseITTest extends RestAPIITTestBase { future.get(); assertThat(session.getLastAccessTime() > lastAccessTime).isTrue(); } + + @Test + void testConfigureSession() throws Exception { + ConfigureSessionRequestBody configureSessionRequestBody = + new ConfigureSessionRequestBody("set 'test' = 'configure';", -1L); + + CompletableFuture<EmptyResponseBody> response = + sendRequest( + ConfigureSessionHeaders.getINSTANCE(), + sessionMessageParameters, + configureSessionRequestBody); + response.get(); + + assertThat( + SQL_GATEWAY_SERVICE_EXTENSION + .getSessionManager() + .getSession(sessionHandle) + .getSessionConfig()) + .containsEntry("test", "configure"); + } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java index 3c2a8776a33..e52a42ee354 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java @@ -22,24 +22,25 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.core.testutils.BlockerSync; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.rest.HttpMethodWrapper; -import org.apache.flink.runtime.rest.RestClient; -import org.apache.flink.runtime.rest.RestServerEndpoint; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.util.RestClientException; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler; import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; +import org.apache.flink.table.gateway.rest.util.TestingRestClient; +import org.apache.flink.table.gateway.rest.util.TestingSqlGatewayRestEndpoint; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -49,44 +50,49 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Objects; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig; import static org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig; +import static org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT cases for {@link SqlGatewayRestEndpoint}. */ class SqlGatewayRestEndpointITCase { - private static final SqlGatewayService service = null; + private static final SqlGatewayService SERVICE = null; - private static RestServerEndpoint serverEndpoint; - private static RestClient restClient; + private static SqlGatewayRestEndpoint serverEndpoint; + private static TestingRestClient restClient; private static InetSocketAddress serverAddress; - private static TestBadCaseHandler testHandler; - private static TestVersionSelectionHeaders1 header1; - private static TestVersionSelectionHeaders2 header2; private static TestBadCaseHeaders badCaseHeader; - private static TestVersionHandler testVersionHandler1; - private static TestVersionHandler testVersionHandler2; + private static TestBadCaseHandler testHandler; + + private static TestVersionSelectionHeaders0 header0; + private static TestVersionSelectionHeadersNot0 headerNot0; + + private static TestVersionHandler testVersionHandler0; + private static TestVersionHandler testVersionHandlerNot0; private static Configuration config; private static final Time timeout = Time.seconds(10L); @@ -94,43 +100,38 @@ class SqlGatewayRestEndpointITCase { @BeforeEach void setup() throws Exception { // Test version cases - header1 = new TestVersionSelectionHeaders1(); - header2 = new TestVersionSelectionHeaders2(); - testVersionHandler1 = new TestVersionHandler(service, header1); - testVersionHandler2 = new TestVersionHandler(service, header2); + header0 = new TestVersionSelectionHeaders0(); + headerNot0 = new TestVersionSelectionHeadersNot0(); + testVersionHandler0 = new TestVersionHandler(SERVICE, header0); + testVersionHandlerNot0 = new TestVersionHandler(SERVICE, headerNot0); // Test exception cases badCaseHeader = new TestBadCaseHeaders(); - testHandler = new TestBadCaseHandler(service); + testHandler = new TestBadCaseHandler(SERVICE); // Init final String address = InetAddress.getLoopbackAddress().getHostAddress(); config = getBaseConfig(getFlinkConfig(address, address, "0")); serverEndpoint = - TestingSqlGatewayRestEndpoint.builder(config, service) + TestingSqlGatewayRestEndpoint.builder(config, SERVICE) .withHandler(badCaseHeader, testHandler) - .withHandler(header1, testVersionHandler1) - .withHandler(header2, testVersionHandler2) + .withHandler(header0, testVersionHandler0) + .withHandler(headerNot0, testVersionHandlerNot0) .buildAndStart(); - restClient = - new RestClient( - config, - Executors.newFixedThreadPool( - 1, new ExecutorThreadFactory("rest-client-thread-pool"))); + restClient = getTestingRestClient(); serverAddress = serverEndpoint.getServerAddress(); } @AfterEach void stop() throws Exception { - if (restClient != null) { - restClient.shutdown(timeout); + restClient.shutdown(); restClient = null; } if (serverEndpoint != null) { - serverEndpoint.closeAsync().get(timeout.getSize(), timeout.getUnit()); + serverEndpoint.stop(); serverEndpoint = null; } } @@ -138,74 +139,95 @@ class SqlGatewayRestEndpointITCase { /** Test that {@link SqlGatewayMessageHeaders} can identify the version correctly. */ @Test void testSqlGatewayMessageHeaders() throws Exception { - // The header only support V1, but send request by V0 + // The header can't support V0, but sends request by V0 assertThatThrownBy( () -> restClient.sendRequest( serverAddress.getHostName(), serverAddress.getPort(), - header2, + headerNot0, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.emptyList(), SqlGatewayRestAPIVersion.V0)) - .isInstanceOf(IllegalArgumentException.class); - - // The header only support V1, send request by V1 + .satisfies( + FlinkAssertions.anyCauseMatches( + IllegalArgumentException.class, + String.format( + "The requested version V0 is not supported by the request (method=%s URL=%s). Supported versions are: %s.", + headerNot0.getHttpMethod(), + headerNot0.getTargetRestEndpointURL(), + headerNot0.getSupportedAPIVersions().stream() + .map(RestAPIVersion::getURLVersionPrefix) + .collect(Collectors.joining(","))))); + + // The header only supports V0, sends request by V0 CompletableFuture<TestResponse> specifiedVersionResponse = restClient.sendRequest( serverAddress.getHostName(), serverAddress.getPort(), - header2, + header0, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.emptyList(), - SqlGatewayRestAPIVersion.V1); + SqlGatewayRestAPIVersion.V0); - TestResponse testResponse1 = specifiedVersionResponse.get(5, TimeUnit.SECONDS); - assertThat(testResponse1.getStatus()).isEqualTo("V1"); + TestResponse testResponse0 = + specifiedVersionResponse.get(timeout.getSize(), timeout.getUnit()); + assertThat(testResponse0.getStatus()).isEqualTo("V0"); - // The header only support V1, send request by latest version V1 - CompletableFuture<TestResponse> unspecifiedVersionResponse = + // The header only supports V0, lets the client get the version + CompletableFuture<TestResponse> unspecifiedVersionResponse0 = restClient.sendRequest( serverAddress.getHostName(), serverAddress.getPort(), - header2, + header0, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.emptyList()); - TestResponse testResponse2 = unspecifiedVersionResponse.get(5, TimeUnit.SECONDS); - assertThat(testResponse2.getStatus()).isEqualTo("V1"); - } + TestResponse testResponse1 = + unspecifiedVersionResponse0.get(timeout.getSize(), timeout.getUnit()); + assertThat(testResponse1.getStatus()).isEqualTo("V0"); - /** Test that requests of different version are routed to correct handlers. */ - @Test - void testVersionSelection() throws Exception { - CompletableFuture<TestResponse> version1Response = + // The header supports multiple versions, lets the client get the latest version as default + CompletableFuture<TestResponse> unspecifiedVersionResponse1 = restClient.sendRequest( serverAddress.getHostName(), serverAddress.getPort(), - header1, + headerNot0, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), - Collections.emptyList(), - SqlGatewayRestAPIVersion.V0); + Collections.emptyList()); - TestResponse testResponse = version1Response.get(5, TimeUnit.SECONDS); - assertThat(testResponse.getStatus()).isEqualTo("V0"); + TestResponse testResponse2 = + unspecifiedVersionResponse1.get(timeout.getSize(), timeout.getUnit()); + assertThat(testResponse2.getStatus()) + .isEqualTo( + RestAPIVersion.getLatestVersion(headerNot0.getSupportedAPIVersions()) + .name()); + } - CompletableFuture<TestResponse> version2Response = - restClient.sendRequest( - serverAddress.getHostName(), - serverAddress.getPort(), - header2, - EmptyMessageParameters.getInstance(), - EmptyRequestBody.getInstance(), - Collections.emptyList(), - SqlGatewayRestAPIVersion.V1); - TestResponse testResponse2 = version2Response.get(5, TimeUnit.SECONDS); - assertThat(testResponse2.getStatus()).isEqualTo("V1"); + /** Test that requests of different version are routed to correct handlers. */ + @Test + void testVersionSelection() throws Exception { + for (SqlGatewayRestAPIVersion version : SqlGatewayRestAPIVersion.values()) { + if (version != SqlGatewayRestAPIVersion.V0) { + CompletableFuture<TestResponse> versionResponse = + restClient.sendRequest( + serverAddress.getHostName(), + serverAddress.getPort(), + headerNot0, + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList(), + version); + + TestResponse testResponse = + versionResponse.get(timeout.getSize(), timeout.getUnit()); + assertThat(testResponse.getStatus()).isEqualTo(version.name()); + } + } } /** @@ -219,12 +241,13 @@ class SqlGatewayRestEndpointITCase { OkHttpClient client = new OkHttpClient(); final Request request = new Request.Builder() - .url(serverEndpoint.getRestBaseUrl() + header1.getTargetRestEndpointURL()) + .url(serverEndpoint.getRestBaseUrl() + header0.getTargetRestEndpointURL()) .build(); final Response response = client.newCall(request).execute(); assert response.body() != null; - assertThat(response.body().string()).contains("V1"); + assertThat(response.body().string()) + .contains(SqlGatewayRestAPIVersion.getDefaultVersion().name()); } /** @@ -254,13 +277,13 @@ class SqlGatewayRestEndpointITCase { // send second request and verify response final CompletableFuture<TestResponse> response2 = sendRequestToTestHandler(new TestRequest(2)); - assertThat(response2.get().status).isEqualTo("2"); + assertThat(response2.get().getStatus()).isEqualTo("2"); // wake up blocked handler sync.releaseBlocker(); // verify response to first request - assertThat(response1.get().status).isEqualTo("1"); + assertThat(response1.get().getStatus()).isEqualTo("1"); } @Test @@ -268,29 +291,35 @@ class SqlGatewayRestEndpointITCase { assertThatThrownBy( () -> { try (TestingSqlGatewayRestEndpoint restServerEndpoint = - TestingSqlGatewayRestEndpoint.builder(config, service) - .withHandler(header1, testHandler) + TestingSqlGatewayRestEndpoint.builder(config, SERVICE) + .withHandler(header0, testHandler) .withHandler(badCaseHeader, testHandler) .build()) { restServerEndpoint.start(); } }) - .isInstanceOf(FlinkRuntimeException.class); + .satisfies( + FlinkAssertions.anyCauseMatches( + FlinkRuntimeException.class, + "Duplicate REST handler instance found. Please ensure each instance is registered only once.")); } @Test - void testEndpointsMustBeUnique() { + void testHandlerRegistrationOverlappingIsForbidden() { assertThatThrownBy( () -> { try (TestingSqlGatewayRestEndpoint restServerEndpoint = - TestingSqlGatewayRestEndpoint.builder(config, service) + TestingSqlGatewayRestEndpoint.builder(config, SERVICE) .withHandler(badCaseHeader, testHandler) - .withHandler(badCaseHeader, testVersionHandler1) + .withHandler(badCaseHeader, testVersionHandler0) .build()) { restServerEndpoint.start(); } }) - .isInstanceOf(FlinkRuntimeException.class); + .satisfies( + FlinkAssertions.anyCauseMatches( + FlinkRuntimeException.class, + "REST handler registration overlaps with another registration for")); } /** @@ -338,40 +367,6 @@ class SqlGatewayRestEndpointITCase { closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit()); } - @Test - void testRestServerBindPort() throws Exception { - final int portRangeStart = 52300; - final int portRangeEnd = 52400; - final String address = InetAddress.getLoopbackAddress().getHostAddress(); - final Configuration sqlGatewayRestEndpointConfig = - getBaseConfig( - getFlinkConfig(address, address, portRangeStart + "-" + portRangeEnd)); - - try (RestServerEndpoint serverEndpoint1 = - TestingSqlGatewayRestEndpoint.builder(sqlGatewayRestEndpointConfig, service) - .build(); - RestServerEndpoint serverEndpoint2 = - TestingSqlGatewayRestEndpoint.builder(sqlGatewayRestEndpointConfig, service) - .build()) { - - serverEndpoint1.start(); - serverEndpoint2.start(); - - assertThat(Objects.requireNonNull(serverEndpoint1.getServerAddress()).getPort()) - .isNotEqualTo( - Objects.requireNonNull(serverEndpoint2.getServerAddress()).getPort()); - - assertThat(serverEndpoint1.getServerAddress().getPort()) - .isGreaterThanOrEqualTo(portRangeStart); - assertThat(serverEndpoint1.getServerAddress().getPort()) - .isLessThanOrEqualTo(portRangeEnd); - assertThat(serverEndpoint2.getServerAddress().getPort()) - .isGreaterThanOrEqualTo(portRangeStart); - assertThat(serverEndpoint2.getServerAddress().getPort()) - .isLessThanOrEqualTo(portRangeEnd); - } - } - @Test void testOnUnavailableRpcEndpointReturns503() { CompletableFuture<TestResponse> response = sendRequestToTestHandler(new TestRequest(3)); @@ -383,54 +378,12 @@ class SqlGatewayRestEndpointITCase { .isEqualTo(HttpResponseStatus.SERVICE_UNAVAILABLE); } - private static class TestBadCaseHandler - extends AbstractSqlGatewayRestHandler< - TestRequest, TestResponse, EmptyMessageParameters> { - - private final OneShotLatch closeLatch = new OneShotLatch(); - - private CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null); - - private Function<Integer, CompletableFuture<TestResponse>> handlerBody; - - TestBadCaseHandler(SqlGatewayService sqlGatewayService) { - super(sqlGatewayService, Collections.emptyMap(), badCaseHeader); - } - - @Override - public CompletableFuture<Void> closeHandlerAsync() { - closeLatch.trigger(); - return closeFuture; - } - - @Override - protected CompletableFuture<TestResponse> handleRequest( - @Nullable SqlGatewayRestAPIVersion version, - @NotNull HandlerRequest<TestRequest> request) { - final int id = request.getRequestBody().id; - if (id == 3) { - return FutureUtils.completedExceptionally( - new EndpointNotStartedException("test exception")); - } - return handlerBody.apply(id); - } - } - - private CompletableFuture<TestResponse> sendRequestToTestHandler( - final TestRequest testRequest) { - try { - return restClient.sendRequest( - serverAddress.getHostName(), - serverAddress.getPort(), - badCaseHeader, - EmptyMessageParameters.getInstance(), - testRequest); - } catch (final IOException e) { - throw new RuntimeException(e); - } - } + // -------------------------------------------------------------------------------------------- + // Messages + // -------------------------------------------------------------------------------------------- private static class TestRequest implements RequestBody { + public final int id; @JsonCreator @@ -441,7 +394,7 @@ class SqlGatewayRestEndpointITCase { private static class TestResponse implements ResponseBody { - public final String status; + private final String status; @JsonCreator public TestResponse(@JsonProperty("status") String status) { @@ -453,6 +406,10 @@ class SqlGatewayRestEndpointITCase { } } + // -------------------------------------------------------------------------------------------- + // Headers + // -------------------------------------------------------------------------------------------- + private static class TestBadCaseHeaders implements SqlGatewayMessageHeaders<TestRequest, TestResponse, EmptyMessageParameters> { @@ -532,35 +489,89 @@ class SqlGatewayRestEndpointITCase { } } - private static class TestVersionSelectionHeaders1 extends TestVersionSelectionHeadersBase { + private static class TestVersionSelectionHeaders0 extends TestVersionSelectionHeadersBase { @Override public Collection<SqlGatewayRestAPIVersion> getSupportedAPIVersions() { return Collections.singleton(SqlGatewayRestAPIVersion.V0); } } - private static class TestVersionSelectionHeaders2 extends TestVersionSelectionHeadersBase { + private static class TestVersionSelectionHeadersNot0 extends TestVersionSelectionHeadersBase { @Override public Collection<SqlGatewayRestAPIVersion> getSupportedAPIVersions() { - return Collections.singleton(SqlGatewayRestAPIVersion.V1); + List<SqlGatewayRestAPIVersion> versions = + new ArrayList<>(Arrays.asList(SqlGatewayRestAPIVersion.values())); + versions.remove(SqlGatewayRestAPIVersion.V0); + return versions; } } + // -------------------------------------------------------------------------------------------- + // Handlers + // -------------------------------------------------------------------------------------------- + private static class TestVersionHandler extends AbstractSqlGatewayRestHandler< EmptyRequestBody, TestResponse, EmptyMessageParameters> { TestVersionHandler( - final SqlGatewayService sqlGatewayService, TestVersionSelectionHeadersBase header) { + SqlGatewayService sqlGatewayService, TestVersionSelectionHeadersBase header) { super(sqlGatewayService, Collections.emptyMap(), header); } @Override protected CompletableFuture<TestResponse> handleRequest( @Nullable SqlGatewayRestAPIVersion version, - @NotNull HandlerRequest<EmptyRequestBody> request) { + @Nonnull HandlerRequest<EmptyRequestBody> request) { assert version != null; return CompletableFuture.completedFuture(new TestResponse(version.name())); } } + + private static class TestBadCaseHandler + extends AbstractSqlGatewayRestHandler< + TestRequest, TestResponse, EmptyMessageParameters> { + + private final OneShotLatch closeLatch = new OneShotLatch(); + + private CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null); + + private Function<Integer, CompletableFuture<TestResponse>> handlerBody; + + TestBadCaseHandler(SqlGatewayService sqlGatewayService) { + super(sqlGatewayService, Collections.emptyMap(), badCaseHeader); + } + + @Override + public CompletableFuture<Void> closeHandlerAsync() { + closeLatch.trigger(); + return closeFuture; + } + + @Override + protected CompletableFuture<TestResponse> handleRequest( + @Nullable SqlGatewayRestAPIVersion version, + @Nonnull HandlerRequest<TestRequest> request) { + final int id = request.getRequestBody().id; + if (id == 3) { + return FutureUtils.completedExceptionally( + new EndpointNotStartedException("test exception")); + } + return handlerBody.apply(id); + } + } + + private CompletableFuture<TestResponse> sendRequestToTestHandler( + final TestRequest testRequest) { + try { + return restClient.sendRequest( + serverAddress.getHostName(), + serverAddress.getPort(), + badCaseHeader, + EmptyMessageParameters.getInstance(), + testRequest); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java index 019bcdd8abd..4fc378e9029 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.data.RowData; @@ -32,7 +31,6 @@ import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion; -import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler; import org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders; import org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders; @@ -43,11 +41,12 @@ import org.apache.flink.table.gateway.rest.message.statement.FetchResultsRespons import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters; import org.apache.flink.table.gateway.rest.serde.ResultInfo; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; +import org.apache.flink.table.gateway.rest.util.TestingRestClient; import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl; import org.apache.flink.table.utils.DateTimeUtils; -import org.apache.flink.util.ConfigurationException; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.extension.RegisterExtension; @@ -61,8 +60,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; +import static org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -78,7 +77,7 @@ class SqlGatewayRestEndpointStatementITCase extends AbstractSqlGatewayStatementI private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService); - private static final RestClient restClient = getTestRestClient(); + private static TestingRestClient restClient; private static final ExecuteStatementHeaders executeStatementHeaders = ExecuteStatementHeaders.getInstance(); private static SessionMessageParameters sessionMessageParameters; @@ -96,6 +95,16 @@ class SqlGatewayRestEndpointStatementITCase extends AbstractSqlGatewayStatementI private SessionHandle sessionHandle; + @BeforeAll + public static void setup() throws Exception { + restClient = getTestingRestClient(); + } + + @AfterAll + public static void cleanUp() throws Exception { + restClient.shutdown(); + } + @BeforeEach @Override public void before(@TempDir Path temporaryFolder) throws Exception { @@ -193,17 +202,6 @@ class SqlGatewayRestEndpointStatementITCase extends AbstractSqlGatewayStatementI .equals(RuntimeExecutionMode.STREAMING); } - private static RestClient getTestRestClient() { - try { - return new RestClient( - new Configuration(), - Executors.newFixedThreadPool( - 1, new ExecutorThreadFactory("rest-client-thread-pool"))); - } catch (ConfigurationException e) { - throw new SqlGatewayException("Cannot get rest client.", e); - } - } - private class RowDataIterator implements Iterator<RowData> { private final SessionHandle sessionHandle; diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilCaseITTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilITCase.java similarity index 98% rename from flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilCaseITTest.java rename to flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilITCase.java index 9ff06a22b63..1e0de661150 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilCaseITTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilITCase.java @@ -43,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; * Test basic logic of handlers inherited from {@link AbstractSqlGatewayRestHandler} in util related * cases. */ -class UtilCaseITTest extends RestAPIITTestBase { +class UtilITCase extends RestAPIITCaseBase { private static final GetInfoHeaders getInfoHeaders = GetInfoHeaders.getInstance(); private static final EmptyRequestBody emptyRequestBody = EmptyRequestBody.getInstance(); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingRestClient.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingRestClient.java new file mode 100644 index 00000000000..41ac76f2a1c --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingRestClient.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** Utility for setting up a rest client based on {@link RestClient} with default settings. */ +public class TestingRestClient extends RestClient { + + private final ExecutorService executorService; + + private TestingRestClient(ExecutorService executorService) throws ConfigurationException { + super(new Configuration(), executorService); + this.executorService = executorService; + } + + public static TestingRestClient getTestingRestClient() throws Exception { + return new TestingRestClient( + Executors.newFixedThreadPool( + 1, new ExecutorThreadFactory("rest-client-thread-pool"))); + } + + public void shutdown() throws Exception { + ExecutorUtils.gracefulShutdown(1, TimeUnit.SECONDS, executorService); + super.closeAsync().get(); + } +} diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/TestingSqlGatewayRestEndpoint.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingSqlGatewayRestEndpoint.java similarity index 85% rename from flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/TestingSqlGatewayRestEndpoint.java rename to flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingSqlGatewayRestEndpoint.java index 9fef8b86e87..a2424dd41b1 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/TestingSqlGatewayRestEndpoint.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingSqlGatewayRestEndpoint.java @@ -16,12 +16,13 @@ * limitations under the License. */ -package org.apache.flink.table.gateway.rest; +package org.apache.flink.table.gateway.rest.util; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.table.gateway.api.SqlGatewayService; +import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint; import org.apache.flink.util.ConfigurationException; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; @@ -35,9 +36,10 @@ import java.util.concurrent.CompletableFuture; * Utility for setting up a rest server based on {@link SqlGatewayRestEndpoint} with a given set of * handlers. */ -class TestingSqlGatewayRestEndpoint extends SqlGatewayRestEndpoint { +public class TestingSqlGatewayRestEndpoint extends SqlGatewayRestEndpoint { - static Builder builder(Configuration configuration, SqlGatewayService sqlGatewayService) { + public static Builder builder( + Configuration configuration, SqlGatewayService sqlGatewayService) { return new Builder(configuration, sqlGatewayService); } @@ -45,7 +47,7 @@ class TestingSqlGatewayRestEndpoint extends SqlGatewayRestEndpoint { * TestSqlGatewayRestEndpoint.Builder is a utility class for instantiating a * TestSqlGatewayRestEndpoint. */ - static class Builder { + public static class Builder { private final Configuration configuration; private final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = @@ -57,17 +59,17 @@ class TestingSqlGatewayRestEndpoint extends SqlGatewayRestEndpoint { this.sqlGatewayService = sqlGatewayService; } - Builder withHandler( + public Builder withHandler( RestHandlerSpecification messageHeaders, ChannelInboundHandler handler) { this.handlers.add(Tuple2.of(messageHeaders, handler)); return this; } - TestingSqlGatewayRestEndpoint build() throws IOException, ConfigurationException { + public TestingSqlGatewayRestEndpoint build() throws IOException, ConfigurationException { return new TestingSqlGatewayRestEndpoint(configuration, handlers, sqlGatewayService); } - TestingSqlGatewayRestEndpoint buildAndStart() throws Exception { + public TestingSqlGatewayRestEndpoint buildAndStart() throws Exception { TestingSqlGatewayRestEndpoint serverEndpoint = build(); serverEndpoint.start(); diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot new file mode 100644 index 00000000000..1ed5ab30ab3 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot @@ -0,0 +1,357 @@ +{ + "calls" : [ { + "url" : "/api_versions", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:util:GetApiVersionResponseBody", + "properties" : { + "versions" : { + "type" : "array", + "items" : { + "type" : "string" + } + } + } + } + }, { + "url" : "/info", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:util:GetInfoResponseBody", + "properties" : { + "productName" : { + "type" : "string" + }, + "version" : { + "type" : "string" + } + } + } + }, { + "url" : "/sessions", + "method" : "POST", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:OpenSessionRequestBody", + "properties" : { + "sessionName" : { + "type" : "string" + }, + "properties" : { + "type" : "object", + "additionalProperties" : { + "type" : "string" + } + } + } + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:OpenSessionResponseBody", + "properties" : { + "sessionHandle" : { + "type" : "string" + } + } + } + }, { + "url" : "/sessions/:session_handle", + "method" : "DELETE", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "session_handle" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:CloseSessionResponseBody", + "properties" : { + "status" : { + "type" : "string" + } + } + } + }, { + "url" : "/sessions/:session_handle", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "session_handle" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:GetSessionConfigResponseBody", + "properties" : { + "properties" : { + "type" : "object", + "additionalProperties" : { + "type" : "string" + } + } + } + } + }, { + "url" : "/sessions/:session_handle/configure-session", + "method" : "POST", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "session_handle" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:ConfigureSessionRequestBody", + "properties" : { + "statement" : { + "type" : "string" + }, + "executionTimeout" : { + "type" : "integer" + } + } + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody" + } + }, { + "url" : "/sessions/:session_handle/heartbeat", + "method" : "POST", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "session_handle" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody" + } + }, { + "url" : "/sessions/:session_handle/operations/:operation_handle/cancel", + "method" : "POST", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "session_handle" + }, { + "key" : "operation_handle" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody", + "properties" : { + "status" : { + "type" : "string" + } + } + } + }, { + "url" : "/sessions/:session_handle/operations/:operation_handle/close", + "method" : "DELETE", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "session_handle" + }, { + "key" : "operation_handle" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody", + "properties" : { + "status" : { + "type" : "string" + } + } + } + }, { + "url" : "/sessions/:session_handle/operations/:operation_handle/result/:token", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "session_handle" + }, { + "key" : "operation_handle" + }, { + "key" : "token" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:FetchResultsResponseBody", + "properties" : { + "results" : { + "type" : "any" + }, + "resultType" : { + "type" : "string" + }, + "nextResultUri" : { + "type" : "string" + } + } + } + }, { + "url" : "/sessions/:session_handle/operations/:operation_handle/status", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "session_handle" + }, { + "key" : "operation_handle" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody", + "properties" : { + "status" : { + "type" : "string" + } + } + } + }, { + "url" : "/sessions/:session_handle/statements", + "method" : "POST", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "session_handle" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:ExecuteStatementRequestBody", + "properties" : { + "statement" : { + "type" : "string" + }, + "executionTimeout" : { + "type" : "integer" + }, + "executionConfig" : { + "type" : "object", + "additionalProperties" : { + "type" : "string" + } + } + } + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:ExecuteStatementResponseBody", + "properties" : { + "operationHandle" : { + "type" : "string" + } + } + } + } ] +}