This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b48cddf12a3 [FLINK-30416][sql-gateway] Add configureSession REST API 
in the SQL Gateway (#21525)
b48cddf12a3 is described below

commit b48cddf12a3171cf1b6ddaa675a6b81eea1254bf
Author: yuzelin <33053040+yuze...@users.noreply.github.com>
AuthorDate: Wed Jan 4 16:21:27 2023 +0800

    [FLINK-30416][sql-gateway] Add configureSession REST API in the SQL Gateway 
(#21525)
    
    This closes #21525
---
 .../table/gateway/rest/SqlGatewayRestEndpoint.java |   8 +
 .../handler/session/ConfigureSessionHandler.java   |  65 ++++
 .../rest/header/SqlGatewayMessageHeaders.java      |   7 +-
 .../header/session/ConfigureSessionHeaders.java    | 101 ++++++
 .../session/ConfigureSessionRequestBody.java       |  57 ++++
 .../rest/util/SqlGatewayRestAPIVersion.java        |  31 +-
 ...CaseITTest.java => OperationRelatedITCase.java} |   2 +-
 ...stAPIITTestBase.java => RestAPIITCaseBase.java} |  23 +-
 ...onCaseITTest.java => SessionRelatedITCase.java} |  71 ++--
 .../gateway/rest/SqlGatewayRestEndpointITCase.java | 339 +++++++++----------
 .../SqlGatewayRestEndpointStatementITCase.java     |  32 +-
 .../rest/{UtilCaseITTest.java => UtilITCase.java}  |   2 +-
 .../table/gateway/rest/util/TestingRestClient.java |  51 +++
 .../{ => util}/TestingSqlGatewayRestEndpoint.java  |  16 +-
 .../resources/sql_gateway_rest_api_v2.snapshot     | 357 +++++++++++++++++++++
 15 files changed, 933 insertions(+), 229 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
index 41afe0709f3..3d063d789ea 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.gateway.rest.handler.operation.CancelOperationHand
 import 
org.apache.flink.table.gateway.rest.handler.operation.CloseOperationHandler;
 import 
org.apache.flink.table.gateway.rest.handler.operation.GetOperationStatusHandler;
 import org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler;
+import 
org.apache.flink.table.gateway.rest.handler.session.ConfigureSessionHandler;
 import 
org.apache.flink.table.gateway.rest.handler.session.GetSessionConfigHandler;
 import org.apache.flink.table.gateway.rest.handler.session.OpenSessionHandler;
 import 
org.apache.flink.table.gateway.rest.handler.session.TriggerSessionHeartbeatHandler;
@@ -39,6 +40,7 @@ import 
org.apache.flink.table.gateway.rest.header.operation.CancelOperationHeade
 import 
org.apache.flink.table.gateway.rest.header.operation.CloseOperationHeaders;
 import 
org.apache.flink.table.gateway.rest.header.operation.GetOperationStatusHeaders;
 import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.ConfigureSessionHeaders;
 import 
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
 import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
 import 
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
@@ -90,6 +92,12 @@ public class SqlGatewayRestEndpoint extends 
RestServerEndpoint implements SqlGat
                         service, responseHeaders, 
CloseSessionHeaders.getInstance());
         handlers.add(Tuple2.of(CloseSessionHeaders.getInstance(), 
closeSessionHandler));
 
+        // Configure session
+        ConfigureSessionHandler configureSessionHandler =
+                new ConfigureSessionHandler(
+                        service, responseHeaders, 
ConfigureSessionHeaders.getINSTANCE());
+        handlers.add(Tuple2.of(ConfigureSessionHeaders.getINSTANCE(), 
configureSessionHandler));
+
         // Get session configuration
         GetSessionConfigHandler getSessionConfigHandler =
                 new GetSessionConfigHandler(
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/session/ConfigureSessionHandler.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/session/ConfigureSessionHandler.java
new file mode 100644
index 00000000000..0b69e73ccde
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/session/ConfigureSessionHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler.session;
+
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
+import 
org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler to configure a session with statement. */
+public class ConfigureSessionHandler
+        extends AbstractSqlGatewayRestHandler<
+                ConfigureSessionRequestBody, EmptyResponseBody, 
SessionMessageParameters> {
+
+    public ConfigureSessionHandler(
+            SqlGatewayService service,
+            Map<String, String> responseHeaders,
+            MessageHeaders<ConfigureSessionRequestBody, EmptyResponseBody, 
SessionMessageParameters>
+                    messageHeaders) {
+        super(service, responseHeaders, messageHeaders);
+    }
+
+    @Override
+    protected CompletableFuture<EmptyResponseBody> handleRequest(
+            SqlGatewayRestAPIVersion version,
+            @Nonnull HandlerRequest<ConfigureSessionRequestBody> request)
+            throws RestHandlerException {
+        SessionHandle sessionHandle = 
request.getPathParameter(SessionHandleIdPathParameter.class);
+        String statement = request.getRequestBody().getStatement();
+        Long timeout = request.getRequestBody().getTimeout();
+        timeout = timeout == null ? 0L : timeout;
+
+        service.configureSession(sessionHandle, statement, timeout);
+
+        return 
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java
index 92d35d164ba..8fbf3db4c11 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java
@@ -25,8 +25,9 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
 
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.stream.Collectors;
 
 /**
  * This class links {@link RequestBody}s to {@link ResponseBody}s types and 
contains meta-data
@@ -44,6 +45,8 @@ public interface SqlGatewayMessageHeaders<
 
     @Override
     default Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
-        return Collections.singleton(SqlGatewayRestAPIVersion.V1);
+        return Arrays.stream(SqlGatewayRestAPIVersion.values())
+                .filter(SqlGatewayRestAPIVersion::isStableVersion)
+                .collect(Collectors.toList());
     }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/session/ConfigureSessionHeaders.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/session/ConfigureSessionHeaders.java
new file mode 100644
index 00000000000..a11b65b09cf
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/session/ConfigureSessionHeaders.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.header.session;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders;
+import 
org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/** Message headers for configuring a session. */
+public class ConfigureSessionHeaders
+        implements SqlGatewayMessageHeaders<
+                ConfigureSessionRequestBody, EmptyResponseBody, 
SessionMessageParameters> {
+
+    private static final ConfigureSessionHeaders INSTANCE = new 
ConfigureSessionHeaders();
+
+    private static final String URL =
+            "/sessions/:" + SessionHandleIdPathParameter.KEY + 
"/configure-session";
+
+    @Override
+    public Class<EmptyResponseBody> getResponseClass() {
+        return EmptyResponseBody.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Configures the session with the statement which could be:\n"
+                + "CREATE TABLE, DROP TABLE, ALTER TABLE, "
+                + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+                + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
+                + "CREATE CATALOG, DROP CATALOG, "
+                + "USE CATALOG, USE [CATALOG.]DATABASE, "
+                + "CREATE VIEW, DROP VIEW, "
+                + "LOAD MODULE, UNLOAD MODULE, USE MODULE, "
+                + "ADD JAR.";
+    }
+
+    @Override
+    public Class<ConfigureSessionRequestBody> getRequestClass() {
+        return ConfigureSessionRequestBody.class;
+    }
+
+    @Override
+    public SessionMessageParameters getUnresolvedMessageParameters() {
+        return new SessionMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.POST;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    @Override
+    public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
+        return Collections.singleton(SqlGatewayRestAPIVersion.V2);
+    }
+
+    public static ConfigureSessionHeaders getINSTANCE() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String operationId() {
+        return "configureSession";
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java
new file mode 100644
index 00000000000..4b16ffa1413
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.session;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+/** {@link RequestBody} for configuring a session. */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ConfigureSessionRequestBody implements RequestBody {
+
+    private static final String FIELD_NAME_STATEMENT = "statement";
+    private static final String FIELD_NAME_EXECUTION_TIMEOUT = 
"executionTimeout";
+
+    @JsonProperty(FIELD_NAME_STATEMENT)
+    private final String statement;
+
+    @JsonProperty(FIELD_NAME_EXECUTION_TIMEOUT)
+    @Nullable
+    private final Long timeout;
+
+    public ConfigureSessionRequestBody(
+            @JsonProperty(FIELD_NAME_STATEMENT) String statement,
+            @Nullable @JsonProperty(FIELD_NAME_EXECUTION_TIMEOUT) Long 
timeout) {
+        this.statement = statement;
+        this.timeout = timeout;
+    }
+
+    public String getStatement() {
+        return statement;
+    }
+
+    @Nullable
+    public Long getTimeout() {
+        return timeout;
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
index cc5706f5d12..2b84aef323a 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
@@ -20,6 +20,11 @@ package org.apache.flink.table.gateway.rest.util;
 
 import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * An enum for all versions of the Sql Gateway REST API.
@@ -34,8 +39,16 @@ public enum SqlGatewayRestAPIVersion
         implements RestAPIVersion<SqlGatewayRestAPIVersion>, EndpointVersion {
     // The bigger the ordinal(its position in enum declaration), the higher 
the level of the
     // version.
+
+    // V0 is just for test
     V0(false, false),
-    V1(true, true);
+
+    // V1 introduces basic APIs for rest endpoint
+    V1(false, true),
+
+    // V2 adds support for configuring Session and allows to serialize the 
RowData with PLAIN_TEXT
+    // or JSON format.
+    V2(true, true);
 
     private final boolean isDefaultVersion;
 
@@ -90,7 +103,21 @@ public enum SqlGatewayRestAPIVersion
         try {
             return valueOf(uri.substring(1, slashIndex).toUpperCase());
         } catch (Exception e) {
-            return V1;
+            return getDefaultVersion();
         }
     }
+
+    public static SqlGatewayRestAPIVersion getDefaultVersion() {
+        List<SqlGatewayRestAPIVersion> versions =
+                Arrays.stream(SqlGatewayRestAPIVersion.values())
+                        .filter(SqlGatewayRestAPIVersion::isDefaultVersion)
+                        .collect(Collectors.toList());
+        Preconditions.checkState(
+                versions.size() == 1,
+                String.format(
+                        "Only one default version of Sql Gateway Rest API, but 
found %s.",
+                        versions.size()));
+
+        return versions.get(0);
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationCaseITTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
similarity index 99%
rename from 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationCaseITTest.java
rename to 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
index ab4c1803294..b5da70444e5 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationCaseITTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
@@ -52,7 +52,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
  * Test basic logic of handlers inherited from {@link 
AbstractSqlGatewayRestHandler} in operation
  * related cases.
  */
-class OperationCaseITTest extends RestAPIITTestBase {
+class OperationRelatedITCase extends RestAPIITCaseBase {
 
     private static final String sessionName = "test";
     private static final Map<String, String> properties = new HashMap<>();
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITTestBase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
similarity index 81%
rename from 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITTestBase.java
rename to 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
index ceef72b3340..2970d2f25a1 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITTestBase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
@@ -20,36 +20,33 @@ package org.apache.flink.table.gateway.rest;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.gateway.rest.util.TestingRestClient;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.flink.util.ExecutorUtils;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
-import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
 import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static 
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** The base class for Rest API IT test. */
-abstract class RestAPIITTestBase {
+abstract class RestAPIITCaseBase {
 
     @RegisterExtension
     @Order(1)
@@ -60,9 +57,8 @@ abstract class RestAPIITTestBase {
     protected static final SqlGatewayServiceExtension 
SQL_GATEWAY_SERVICE_EXTENSION =
             new 
SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
 
-    @Nullable private static RestClient restClient = null;
+    @Nullable private static TestingRestClient restClient = null;
     @Nullable private static String targetAddress = null;
-    @Nullable private static ExecutorService executorService = null;
     @Nullable private static SqlGatewayRestEndpoint sqlGatewayRestEndpoint = 
null;
 
     private static int port = 0;
@@ -75,10 +71,7 @@ abstract class RestAPIITTestBase {
                 new SqlGatewayRestEndpoint(config, 
SQL_GATEWAY_SERVICE_EXTENSION.getService());
         sqlGatewayRestEndpoint.start();
         InetSocketAddress serverAddress = 
checkNotNull(sqlGatewayRestEndpoint.getServerAddress());
-        executorService =
-                Executors.newFixedThreadPool(
-                        1, new 
ExecutorThreadFactory("rest-client-thread-pool"));
-        restClient = new RestClient(new Configuration(), executorService);
+        restClient = getTestingRestClient();
         targetAddress = serverAddress.getHostName();
         port = serverAddress.getPort();
     }
@@ -89,8 +82,6 @@ abstract class RestAPIITTestBase {
         sqlGatewayRestEndpoint.close();
         checkNotNull(restClient);
         restClient.shutdown(Time.seconds(3));
-        checkNotNull(executorService);
-        ExecutorUtils.gracefulShutdown(3, TimeUnit.SECONDS, executorService);
     }
 
     public <
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionCaseITTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionRelatedITCase.java
similarity index 80%
rename from 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionCaseITTest.java
rename to 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionRelatedITCase.java
index afc27904911..2cef32f269f 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionCaseITTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionRelatedITCase.java
@@ -25,16 +25,20 @@ import 
org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
 import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.header.session.ConfigureSessionHeaders;
 import 
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
 import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
 import 
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
 import 
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody;
 import 
org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
 import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
 import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
 import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
 import org.apache.flink.table.gateway.service.session.Session;
 
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -50,13 +54,12 @@ import java.util.concurrent.ExecutionException;
 import static 
org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 /**
  * Test basic logic of handlers inherited from {@link 
AbstractSqlGatewayRestHandler} in session
  * related cases.
  */
-class SessionCaseITTest extends RestAPIITTestBase {
+class SessionRelatedITCase extends RestAPIITCaseBase {
 
     private static final String SESSION_NAME = "test";
     private static final Map<String, String> properties = new HashMap<>();
@@ -77,6 +80,33 @@ class SessionCaseITTest extends RestAPIITTestBase {
             CloseSessionHeaders.getInstance();
     private static final EmptyRequestBody emptyRequestBody = 
EmptyRequestBody.getInstance();
 
+    private SessionHandle sessionHandle;
+
+    private SessionMessageParameters sessionMessageParameters;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        CompletableFuture<OpenSessionResponseBody> response =
+                sendRequest(openSessionHeaders, emptyParameters, 
openSessionRequestBody);
+        String sessionHandleId = response.get().getSessionHandle();
+        assertThat(sessionHandleId).isNotNull();
+
+        sessionHandle = new SessionHandle(UUID.fromString(sessionHandleId));
+        
assertThat(SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(sessionHandle))
+                .isNotNull();
+
+        sessionMessageParameters = new SessionMessageParameters(sessionHandle);
+    }
+
+    @AfterEach
+    public void cleanUp() throws Exception {
+        CompletableFuture<CloseSessionResponseBody> response =
+                sendRequest(closeSessionHeaders, sessionMessageParameters, 
emptyRequestBody);
+
+        String status = response.get().getStatus();
+        assertThat(status).isEqualTo(CLOSE_MESSAGE);
+    }
+
     @Test
     void testCreateAndCloseSessions() throws Exception {
         List<SessionHandle> sessionHandles = new ArrayList<>();
@@ -117,16 +147,6 @@ class SessionCaseITTest extends RestAPIITTestBase {
 
     @Test
     void testGetSessionConfiguration() throws Exception {
-        CompletableFuture<OpenSessionResponseBody> response =
-                sendRequest(openSessionHeaders, emptyParameters, 
openSessionRequestBody);
-        String sessionHandleId = response.get().getSessionHandle();
-        assertThat(sessionHandleId).isNotNull();
-        SessionHandle sessionHandle = new 
SessionHandle(UUID.fromString(sessionHandleId));
-        
assertThat(SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(sessionHandle))
-                .isNotNull();
-
-        SessionMessageParameters sessionMessageParameters =
-                new SessionMessageParameters(sessionHandle);
         CompletableFuture<GetSessionConfigResponseBody> future =
                 sendRequest(
                         GetSessionConfigHeaders.getInstance(),
@@ -140,19 +160,12 @@ class SessionCaseITTest extends RestAPIITTestBase {
 
     @Test
     void testTouchSession() throws Exception {
-        CompletableFuture<OpenSessionResponseBody> response =
-                sendRequest(openSessionHeaders, emptyParameters, 
openSessionRequestBody);
-        String sessionHandleId = response.get().getSessionHandle();
-        assertNotNull(sessionHandleId);
-        SessionHandle sessionHandle = new 
SessionHandle(UUID.fromString(sessionHandleId));
         Session session =
                 
SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(sessionHandle);
         assertThat(session).isNotNull();
 
         long lastAccessTime = session.getLastAccessTime();
 
-        SessionMessageParameters sessionMessageParameters =
-                new SessionMessageParameters(sessionHandle);
         CompletableFuture<EmptyResponseBody> future =
                 sendRequest(
                         TriggerSessionHeartbeatHeaders.getInstance(),
@@ -161,4 +174,24 @@ class SessionCaseITTest extends RestAPIITTestBase {
         future.get();
         assertThat(session.getLastAccessTime() > lastAccessTime).isTrue();
     }
+
+    @Test
+    void testConfigureSession() throws Exception {
+        ConfigureSessionRequestBody configureSessionRequestBody =
+                new ConfigureSessionRequestBody("set 'test' = 'configure';", 
-1L);
+
+        CompletableFuture<EmptyResponseBody> response =
+                sendRequest(
+                        ConfigureSessionHeaders.getINSTANCE(),
+                        sessionMessageParameters,
+                        configureSessionRequestBody);
+        response.get();
+
+        assertThat(
+                        SQL_GATEWAY_SERVICE_EXTENSION
+                                .getSessionManager()
+                                .getSession(sessionHandle)
+                                .getSessionConfig())
+                .containsEntry("test", "configure");
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
index 3c2a8776a33..e52a42ee354 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
@@ -22,24 +22,25 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
 import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+import org.apache.flink.table.gateway.rest.util.TestingRestClient;
+import org.apache.flink.table.gateway.rest.util.TestingSqlGatewayRestEndpoint;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -49,44 +50,49 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Objects;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
 import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static 
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** IT cases for {@link SqlGatewayRestEndpoint}. */
 class SqlGatewayRestEndpointITCase {
 
-    private static final SqlGatewayService service = null;
+    private static final SqlGatewayService SERVICE = null;
 
-    private static RestServerEndpoint serverEndpoint;
-    private static RestClient restClient;
+    private static SqlGatewayRestEndpoint serverEndpoint;
+    private static TestingRestClient restClient;
     private static InetSocketAddress serverAddress;
 
-    private static TestBadCaseHandler testHandler;
-    private static TestVersionSelectionHeaders1 header1;
-    private static TestVersionSelectionHeaders2 header2;
     private static TestBadCaseHeaders badCaseHeader;
-    private static TestVersionHandler testVersionHandler1;
-    private static TestVersionHandler testVersionHandler2;
+    private static TestBadCaseHandler testHandler;
+
+    private static TestVersionSelectionHeaders0 header0;
+    private static TestVersionSelectionHeadersNot0 headerNot0;
+
+    private static TestVersionHandler testVersionHandler0;
+    private static TestVersionHandler testVersionHandlerNot0;
 
     private static Configuration config;
     private static final Time timeout = Time.seconds(10L);
@@ -94,43 +100,38 @@ class SqlGatewayRestEndpointITCase {
     @BeforeEach
     void setup() throws Exception {
         // Test version cases
-        header1 = new TestVersionSelectionHeaders1();
-        header2 = new TestVersionSelectionHeaders2();
-        testVersionHandler1 = new TestVersionHandler(service, header1);
-        testVersionHandler2 = new TestVersionHandler(service, header2);
+        header0 = new TestVersionSelectionHeaders0();
+        headerNot0 = new TestVersionSelectionHeadersNot0();
+        testVersionHandler0 = new TestVersionHandler(SERVICE, header0);
+        testVersionHandlerNot0 = new TestVersionHandler(SERVICE, headerNot0);
 
         // Test exception cases
         badCaseHeader = new TestBadCaseHeaders();
-        testHandler = new TestBadCaseHandler(service);
+        testHandler = new TestBadCaseHandler(SERVICE);
 
         // Init
         final String address = 
InetAddress.getLoopbackAddress().getHostAddress();
         config = getBaseConfig(getFlinkConfig(address, address, "0"));
         serverEndpoint =
-                TestingSqlGatewayRestEndpoint.builder(config, service)
+                TestingSqlGatewayRestEndpoint.builder(config, SERVICE)
                         .withHandler(badCaseHeader, testHandler)
-                        .withHandler(header1, testVersionHandler1)
-                        .withHandler(header2, testVersionHandler2)
+                        .withHandler(header0, testVersionHandler0)
+                        .withHandler(headerNot0, testVersionHandlerNot0)
                         .buildAndStart();
 
-        restClient =
-                new RestClient(
-                        config,
-                        Executors.newFixedThreadPool(
-                                1, new 
ExecutorThreadFactory("rest-client-thread-pool")));
+        restClient = getTestingRestClient();
         serverAddress = serverEndpoint.getServerAddress();
     }
 
     @AfterEach
     void stop() throws Exception {
-
         if (restClient != null) {
-            restClient.shutdown(timeout);
+            restClient.shutdown();
             restClient = null;
         }
 
         if (serverEndpoint != null) {
-            serverEndpoint.closeAsync().get(timeout.getSize(), 
timeout.getUnit());
+            serverEndpoint.stop();
             serverEndpoint = null;
         }
     }
@@ -138,74 +139,95 @@ class SqlGatewayRestEndpointITCase {
     /** Test that {@link SqlGatewayMessageHeaders} can identify the version 
correctly. */
     @Test
     void testSqlGatewayMessageHeaders() throws Exception {
-        // The header only support V1, but send request by V0
+        // The header can't support V0, but sends request by V0
         assertThatThrownBy(
                         () ->
                                 restClient.sendRequest(
                                         serverAddress.getHostName(),
                                         serverAddress.getPort(),
-                                        header2,
+                                        headerNot0,
                                         EmptyMessageParameters.getInstance(),
                                         EmptyRequestBody.getInstance(),
                                         Collections.emptyList(),
                                         SqlGatewayRestAPIVersion.V0))
-                .isInstanceOf(IllegalArgumentException.class);
-
-        // The header only support V1, send request by V1
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                IllegalArgumentException.class,
+                                String.format(
+                                        "The requested version V0 is not 
supported by the request (method=%s URL=%s). Supported versions are: %s.",
+                                        headerNot0.getHttpMethod(),
+                                        headerNot0.getTargetRestEndpointURL(),
+                                        
headerNot0.getSupportedAPIVersions().stream()
+                                                
.map(RestAPIVersion::getURLVersionPrefix)
+                                                
.collect(Collectors.joining(",")))));
+
+        // The header only supports V0, sends request by V0
         CompletableFuture<TestResponse> specifiedVersionResponse =
                 restClient.sendRequest(
                         serverAddress.getHostName(),
                         serverAddress.getPort(),
-                        header2,
+                        header0,
                         EmptyMessageParameters.getInstance(),
                         EmptyRequestBody.getInstance(),
                         Collections.emptyList(),
-                        SqlGatewayRestAPIVersion.V1);
+                        SqlGatewayRestAPIVersion.V0);
 
-        TestResponse testResponse1 = specifiedVersionResponse.get(5, 
TimeUnit.SECONDS);
-        assertThat(testResponse1.getStatus()).isEqualTo("V1");
+        TestResponse testResponse0 =
+                specifiedVersionResponse.get(timeout.getSize(), 
timeout.getUnit());
+        assertThat(testResponse0.getStatus()).isEqualTo("V0");
 
-        // The header only support V1, send request by latest version V1
-        CompletableFuture<TestResponse> unspecifiedVersionResponse =
+        // The header only supports V0, lets the client get the version
+        CompletableFuture<TestResponse> unspecifiedVersionResponse0 =
                 restClient.sendRequest(
                         serverAddress.getHostName(),
                         serverAddress.getPort(),
-                        header2,
+                        header0,
                         EmptyMessageParameters.getInstance(),
                         EmptyRequestBody.getInstance(),
                         Collections.emptyList());
 
-        TestResponse testResponse2 = unspecifiedVersionResponse.get(5, 
TimeUnit.SECONDS);
-        assertThat(testResponse2.getStatus()).isEqualTo("V1");
-    }
+        TestResponse testResponse1 =
+                unspecifiedVersionResponse0.get(timeout.getSize(), 
timeout.getUnit());
+        assertThat(testResponse1.getStatus()).isEqualTo("V0");
 
-    /** Test that requests of different version are routed to correct 
handlers. */
-    @Test
-    void testVersionSelection() throws Exception {
-        CompletableFuture<TestResponse> version1Response =
+        // The header supports multiple versions, lets the client get the 
latest version as default
+        CompletableFuture<TestResponse> unspecifiedVersionResponse1 =
                 restClient.sendRequest(
                         serverAddress.getHostName(),
                         serverAddress.getPort(),
-                        header1,
+                        headerNot0,
                         EmptyMessageParameters.getInstance(),
                         EmptyRequestBody.getInstance(),
-                        Collections.emptyList(),
-                        SqlGatewayRestAPIVersion.V0);
+                        Collections.emptyList());
 
-        TestResponse testResponse = version1Response.get(5, TimeUnit.SECONDS);
-        assertThat(testResponse.getStatus()).isEqualTo("V0");
+        TestResponse testResponse2 =
+                unspecifiedVersionResponse1.get(timeout.getSize(), 
timeout.getUnit());
+        assertThat(testResponse2.getStatus())
+                .isEqualTo(
+                        
RestAPIVersion.getLatestVersion(headerNot0.getSupportedAPIVersions())
+                                .name());
+    }
 
-        CompletableFuture<TestResponse> version2Response =
-                restClient.sendRequest(
-                        serverAddress.getHostName(),
-                        serverAddress.getPort(),
-                        header2,
-                        EmptyMessageParameters.getInstance(),
-                        EmptyRequestBody.getInstance(),
-                        Collections.emptyList(),
-                        SqlGatewayRestAPIVersion.V1);
-        TestResponse testResponse2 = version2Response.get(5, TimeUnit.SECONDS);
-        assertThat(testResponse2.getStatus()).isEqualTo("V1");
+    /** Test that requests of different version are routed to correct 
handlers. */
+    @Test
+    void testVersionSelection() throws Exception {
+        for (SqlGatewayRestAPIVersion version : 
SqlGatewayRestAPIVersion.values()) {
+            if (version != SqlGatewayRestAPIVersion.V0) {
+                CompletableFuture<TestResponse> versionResponse =
+                        restClient.sendRequest(
+                                serverAddress.getHostName(),
+                                serverAddress.getPort(),
+                                headerNot0,
+                                EmptyMessageParameters.getInstance(),
+                                EmptyRequestBody.getInstance(),
+                                Collections.emptyList(),
+                                version);
+
+                TestResponse testResponse =
+                        versionResponse.get(timeout.getSize(), 
timeout.getUnit());
+                assertThat(testResponse.getStatus()).isEqualTo(version.name());
+            }
+        }
     }
 
     /**
@@ -219,12 +241,13 @@ class SqlGatewayRestEndpointITCase {
         OkHttpClient client = new OkHttpClient();
         final Request request =
                 new Request.Builder()
-                        .url(serverEndpoint.getRestBaseUrl() + 
header1.getTargetRestEndpointURL())
+                        .url(serverEndpoint.getRestBaseUrl() + 
header0.getTargetRestEndpointURL())
                         .build();
 
         final Response response = client.newCall(request).execute();
         assert response.body() != null;
-        assertThat(response.body().string()).contains("V1");
+        assertThat(response.body().string())
+                .contains(SqlGatewayRestAPIVersion.getDefaultVersion().name());
     }
 
     /**
@@ -254,13 +277,13 @@ class SqlGatewayRestEndpointITCase {
         // send second request and verify response
         final CompletableFuture<TestResponse> response2 =
                 sendRequestToTestHandler(new TestRequest(2));
-        assertThat(response2.get().status).isEqualTo("2");
+        assertThat(response2.get().getStatus()).isEqualTo("2");
 
         // wake up blocked handler
         sync.releaseBlocker();
 
         // verify response to first request
-        assertThat(response1.get().status).isEqualTo("1");
+        assertThat(response1.get().getStatus()).isEqualTo("1");
     }
 
     @Test
@@ -268,29 +291,35 @@ class SqlGatewayRestEndpointITCase {
         assertThatThrownBy(
                         () -> {
                             try (TestingSqlGatewayRestEndpoint 
restServerEndpoint =
-                                    
TestingSqlGatewayRestEndpoint.builder(config, service)
-                                            .withHandler(header1, testHandler)
+                                    
TestingSqlGatewayRestEndpoint.builder(config, SERVICE)
+                                            .withHandler(header0, testHandler)
                                             .withHandler(badCaseHeader, 
testHandler)
                                             .build()) {
                                 restServerEndpoint.start();
                             }
                         })
-                .isInstanceOf(FlinkRuntimeException.class);
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                FlinkRuntimeException.class,
+                                "Duplicate REST handler instance found. Please 
ensure each instance is registered only once."));
     }
 
     @Test
-    void testEndpointsMustBeUnique() {
+    void testHandlerRegistrationOverlappingIsForbidden() {
         assertThatThrownBy(
                         () -> {
                             try (TestingSqlGatewayRestEndpoint 
restServerEndpoint =
-                                    
TestingSqlGatewayRestEndpoint.builder(config, service)
+                                    
TestingSqlGatewayRestEndpoint.builder(config, SERVICE)
                                             .withHandler(badCaseHeader, 
testHandler)
-                                            .withHandler(badCaseHeader, 
testVersionHandler1)
+                                            .withHandler(badCaseHeader, 
testVersionHandler0)
                                             .build()) {
                                 restServerEndpoint.start();
                             }
                         })
-                .isInstanceOf(FlinkRuntimeException.class);
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                FlinkRuntimeException.class,
+                                "REST handler registration overlaps with 
another registration for"));
     }
 
     /**
@@ -338,40 +367,6 @@ class SqlGatewayRestEndpointITCase {
         closeRestServerEndpointFuture.get(timeout.getSize(), 
timeout.getUnit());
     }
 
-    @Test
-    void testRestServerBindPort() throws Exception {
-        final int portRangeStart = 52300;
-        final int portRangeEnd = 52400;
-        final String address = 
InetAddress.getLoopbackAddress().getHostAddress();
-        final Configuration sqlGatewayRestEndpointConfig =
-                getBaseConfig(
-                        getFlinkConfig(address, address, portRangeStart + "-" 
+ portRangeEnd));
-
-        try (RestServerEndpoint serverEndpoint1 =
-                        
TestingSqlGatewayRestEndpoint.builder(sqlGatewayRestEndpointConfig, service)
-                                .build();
-                RestServerEndpoint serverEndpoint2 =
-                        
TestingSqlGatewayRestEndpoint.builder(sqlGatewayRestEndpointConfig, service)
-                                .build()) {
-
-            serverEndpoint1.start();
-            serverEndpoint2.start();
-
-            
assertThat(Objects.requireNonNull(serverEndpoint1.getServerAddress()).getPort())
-                    .isNotEqualTo(
-                            
Objects.requireNonNull(serverEndpoint2.getServerAddress()).getPort());
-
-            assertThat(serverEndpoint1.getServerAddress().getPort())
-                    .isGreaterThanOrEqualTo(portRangeStart);
-            assertThat(serverEndpoint1.getServerAddress().getPort())
-                    .isLessThanOrEqualTo(portRangeEnd);
-            assertThat(serverEndpoint2.getServerAddress().getPort())
-                    .isGreaterThanOrEqualTo(portRangeStart);
-            assertThat(serverEndpoint2.getServerAddress().getPort())
-                    .isLessThanOrEqualTo(portRangeEnd);
-        }
-    }
-
     @Test
     void testOnUnavailableRpcEndpointReturns503() {
         CompletableFuture<TestResponse> response = 
sendRequestToTestHandler(new TestRequest(3));
@@ -383,54 +378,12 @@ class SqlGatewayRestEndpointITCase {
                 .isEqualTo(HttpResponseStatus.SERVICE_UNAVAILABLE);
     }
 
-    private static class TestBadCaseHandler
-            extends AbstractSqlGatewayRestHandler<
-                    TestRequest, TestResponse, EmptyMessageParameters> {
-
-        private final OneShotLatch closeLatch = new OneShotLatch();
-
-        private CompletableFuture<Void> closeFuture = 
CompletableFuture.completedFuture(null);
-
-        private Function<Integer, CompletableFuture<TestResponse>> handlerBody;
-
-        TestBadCaseHandler(SqlGatewayService sqlGatewayService) {
-            super(sqlGatewayService, Collections.emptyMap(), badCaseHeader);
-        }
-
-        @Override
-        public CompletableFuture<Void> closeHandlerAsync() {
-            closeLatch.trigger();
-            return closeFuture;
-        }
-
-        @Override
-        protected CompletableFuture<TestResponse> handleRequest(
-                @Nullable SqlGatewayRestAPIVersion version,
-                @NotNull HandlerRequest<TestRequest> request) {
-            final int id = request.getRequestBody().id;
-            if (id == 3) {
-                return FutureUtils.completedExceptionally(
-                        new EndpointNotStartedException("test exception"));
-            }
-            return handlerBody.apply(id);
-        }
-    }
-
-    private CompletableFuture<TestResponse> sendRequestToTestHandler(
-            final TestRequest testRequest) {
-        try {
-            return restClient.sendRequest(
-                    serverAddress.getHostName(),
-                    serverAddress.getPort(),
-                    badCaseHeader,
-                    EmptyMessageParameters.getInstance(),
-                    testRequest);
-        } catch (final IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
+    // 
--------------------------------------------------------------------------------------------
+    // Messages
+    // 
--------------------------------------------------------------------------------------------
 
     private static class TestRequest implements RequestBody {
+
         public final int id;
 
         @JsonCreator
@@ -441,7 +394,7 @@ class SqlGatewayRestEndpointITCase {
 
     private static class TestResponse implements ResponseBody {
 
-        public final String status;
+        private final String status;
 
         @JsonCreator
         public TestResponse(@JsonProperty("status") String status) {
@@ -453,6 +406,10 @@ class SqlGatewayRestEndpointITCase {
         }
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Headers
+    // 
--------------------------------------------------------------------------------------------
+
     private static class TestBadCaseHeaders
             implements SqlGatewayMessageHeaders<TestRequest, TestResponse, 
EmptyMessageParameters> {
 
@@ -532,35 +489,89 @@ class SqlGatewayRestEndpointITCase {
         }
     }
 
-    private static class TestVersionSelectionHeaders1 extends 
TestVersionSelectionHeadersBase {
+    private static class TestVersionSelectionHeaders0 extends 
TestVersionSelectionHeadersBase {
         @Override
         public Collection<SqlGatewayRestAPIVersion> getSupportedAPIVersions() {
             return Collections.singleton(SqlGatewayRestAPIVersion.V0);
         }
     }
 
-    private static class TestVersionSelectionHeaders2 extends 
TestVersionSelectionHeadersBase {
+    private static class TestVersionSelectionHeadersNot0 extends 
TestVersionSelectionHeadersBase {
         @Override
         public Collection<SqlGatewayRestAPIVersion> getSupportedAPIVersions() {
-            return Collections.singleton(SqlGatewayRestAPIVersion.V1);
+            List<SqlGatewayRestAPIVersion> versions =
+                    new 
ArrayList<>(Arrays.asList(SqlGatewayRestAPIVersion.values()));
+            versions.remove(SqlGatewayRestAPIVersion.V0);
+            return versions;
         }
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    // Handlers
+    // 
--------------------------------------------------------------------------------------------
+
     private static class TestVersionHandler
             extends AbstractSqlGatewayRestHandler<
                     EmptyRequestBody, TestResponse, EmptyMessageParameters> {
 
         TestVersionHandler(
-                final SqlGatewayService sqlGatewayService, 
TestVersionSelectionHeadersBase header) {
+                SqlGatewayService sqlGatewayService, 
TestVersionSelectionHeadersBase header) {
             super(sqlGatewayService, Collections.emptyMap(), header);
         }
 
         @Override
         protected CompletableFuture<TestResponse> handleRequest(
                 @Nullable SqlGatewayRestAPIVersion version,
-                @NotNull HandlerRequest<EmptyRequestBody> request) {
+                @Nonnull HandlerRequest<EmptyRequestBody> request) {
             assert version != null;
             return CompletableFuture.completedFuture(new 
TestResponse(version.name()));
         }
     }
+
+    private static class TestBadCaseHandler
+            extends AbstractSqlGatewayRestHandler<
+                    TestRequest, TestResponse, EmptyMessageParameters> {
+
+        private final OneShotLatch closeLatch = new OneShotLatch();
+
+        private CompletableFuture<Void> closeFuture = 
CompletableFuture.completedFuture(null);
+
+        private Function<Integer, CompletableFuture<TestResponse>> handlerBody;
+
+        TestBadCaseHandler(SqlGatewayService sqlGatewayService) {
+            super(sqlGatewayService, Collections.emptyMap(), badCaseHeader);
+        }
+
+        @Override
+        public CompletableFuture<Void> closeHandlerAsync() {
+            closeLatch.trigger();
+            return closeFuture;
+        }
+
+        @Override
+        protected CompletableFuture<TestResponse> handleRequest(
+                @Nullable SqlGatewayRestAPIVersion version,
+                @Nonnull HandlerRequest<TestRequest> request) {
+            final int id = request.getRequestBody().id;
+            if (id == 3) {
+                return FutureUtils.completedExceptionally(
+                        new EndpointNotStartedException("test exception"));
+            }
+            return handlerBody.apply(id);
+        }
+    }
+
+    private CompletableFuture<TestResponse> sendRequestToTestHandler(
+            final TestRequest testRequest) {
+        try {
+            return restClient.sendRequest(
+                    serverAddress.getHostName(),
+                    serverAddress.getPort(),
+                    badCaseHeader,
+                    EmptyMessageParameters.getInstance(),
+                    testRequest);
+        } catch (final IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
index 019bcdd8abd..4fc378e9029 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
@@ -32,7 +31,6 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
-import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
 import 
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
 import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
@@ -43,11 +41,12 @@ import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsRespons
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
 import org.apache.flink.table.gateway.rest.serde.ResultInfo;
 import 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.rest.util.TestingRestClient;
 import 
org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
 import org.apache.flink.table.utils.DateTimeUtils;
-import org.apache.flink.util.ConfigurationException;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -61,8 +60,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
 
+import static 
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -78,7 +77,7 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
     private static final SqlGatewayRestEndpointExtension 
SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
             new 
SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
 
-    private static final RestClient restClient = getTestRestClient();
+    private static TestingRestClient restClient;
     private static final ExecuteStatementHeaders executeStatementHeaders =
             ExecuteStatementHeaders.getInstance();
     private static SessionMessageParameters sessionMessageParameters;
@@ -96,6 +95,16 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
 
     private SessionHandle sessionHandle;
 
+    @BeforeAll
+    public static void setup() throws Exception {
+        restClient = getTestingRestClient();
+    }
+
+    @AfterAll
+    public static void cleanUp() throws Exception {
+        restClient.shutdown();
+    }
+
     @BeforeEach
     @Override
     public void before(@TempDir Path temporaryFolder) throws Exception {
@@ -193,17 +202,6 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
                 .equals(RuntimeExecutionMode.STREAMING);
     }
 
-    private static RestClient getTestRestClient() {
-        try {
-            return new RestClient(
-                    new Configuration(),
-                    Executors.newFixedThreadPool(
-                            1, new 
ExecutorThreadFactory("rest-client-thread-pool")));
-        } catch (ConfigurationException e) {
-            throw new SqlGatewayException("Cannot get rest client.", e);
-        }
-    }
-
     private class RowDataIterator implements Iterator<RowData> {
 
         private final SessionHandle sessionHandle;
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilCaseITTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilITCase.java
similarity index 98%
rename from 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilCaseITTest.java
rename to 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilITCase.java
index 9ff06a22b63..1e0de661150 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilCaseITTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilITCase.java
@@ -43,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  * Test basic logic of handlers inherited from {@link 
AbstractSqlGatewayRestHandler} in util related
  * cases.
  */
-class UtilCaseITTest extends RestAPIITTestBase {
+class UtilITCase extends RestAPIITCaseBase {
 
     private static final GetInfoHeaders getInfoHeaders = 
GetInfoHeaders.getInstance();
     private static final EmptyRequestBody emptyRequestBody = 
EmptyRequestBody.getInstance();
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingRestClient.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingRestClient.java
new file mode 100644
index 00000000000..41ac76f2a1c
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingRestClient.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** Utility for setting up a rest client based on {@link RestClient} with 
default settings. */
+public class TestingRestClient extends RestClient {
+
+    private final ExecutorService executorService;
+
+    private TestingRestClient(ExecutorService executorService) throws 
ConfigurationException {
+        super(new Configuration(), executorService);
+        this.executorService = executorService;
+    }
+
+    public static TestingRestClient getTestingRestClient() throws Exception {
+        return new TestingRestClient(
+                Executors.newFixedThreadPool(
+                        1, new 
ExecutorThreadFactory("rest-client-thread-pool")));
+    }
+
+    public void shutdown() throws Exception {
+        ExecutorUtils.gracefulShutdown(1, TimeUnit.SECONDS, executorService);
+        super.closeAsync().get();
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/TestingSqlGatewayRestEndpoint.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingSqlGatewayRestEndpoint.java
similarity index 85%
rename from 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/TestingSqlGatewayRestEndpoint.java
rename to 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingSqlGatewayRestEndpoint.java
index 9fef8b86e87..a2424dd41b1 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/TestingSqlGatewayRestEndpoint.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingSqlGatewayRestEndpoint.java
@@ -16,12 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.gateway.rest;
+package org.apache.flink.table.gateway.rest.util;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint;
 import org.apache.flink.util.ConfigurationException;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
@@ -35,9 +36,10 @@ import java.util.concurrent.CompletableFuture;
  * Utility for setting up a rest server based on {@link 
SqlGatewayRestEndpoint} with a given set of
  * handlers.
  */
-class TestingSqlGatewayRestEndpoint extends SqlGatewayRestEndpoint {
+public class TestingSqlGatewayRestEndpoint extends SqlGatewayRestEndpoint {
 
-    static Builder builder(Configuration configuration, SqlGatewayService 
sqlGatewayService) {
+    public static Builder builder(
+            Configuration configuration, SqlGatewayService sqlGatewayService) {
         return new Builder(configuration, sqlGatewayService);
     }
 
@@ -45,7 +47,7 @@ class TestingSqlGatewayRestEndpoint extends 
SqlGatewayRestEndpoint {
      * TestSqlGatewayRestEndpoint.Builder is a utility class for instantiating 
a
      * TestSqlGatewayRestEndpoint.
      */
-    static class Builder {
+    public static class Builder {
 
         private final Configuration configuration;
         private final List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers =
@@ -57,17 +59,17 @@ class TestingSqlGatewayRestEndpoint extends 
SqlGatewayRestEndpoint {
             this.sqlGatewayService = sqlGatewayService;
         }
 
-        Builder withHandler(
+        public Builder withHandler(
                 RestHandlerSpecification messageHeaders, ChannelInboundHandler 
handler) {
             this.handlers.add(Tuple2.of(messageHeaders, handler));
             return this;
         }
 
-        TestingSqlGatewayRestEndpoint build() throws IOException, 
ConfigurationException {
+        public TestingSqlGatewayRestEndpoint build() throws IOException, 
ConfigurationException {
             return new TestingSqlGatewayRestEndpoint(configuration, handlers, 
sqlGatewayService);
         }
 
-        TestingSqlGatewayRestEndpoint buildAndStart() throws Exception {
+        public TestingSqlGatewayRestEndpoint buildAndStart() throws Exception {
             TestingSqlGatewayRestEndpoint serverEndpoint = build();
             serverEndpoint.start();
 
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
 
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
new file mode 100644
index 00000000000..1ed5ab30ab3
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
@@ -0,0 +1,357 @@
+{
+  "calls" : [ {
+    "url" : "/api_versions",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:util:GetApiVersionResponseBody",
+      "properties" : {
+        "versions" : {
+          "type" : "array",
+          "items" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
+  }, {
+    "url" : "/info",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:util:GetInfoResponseBody",
+      "properties" : {
+        "productName" : {
+          "type" : "string"
+        },
+        "version" : {
+          "type" : "string"
+        }
+      }
+    }
+  }, {
+    "url" : "/sessions",
+    "method" : "POST",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:OpenSessionRequestBody",
+      "properties" : {
+        "sessionName" : {
+          "type" : "string"
+        },
+        "properties" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "string"
+          }
+        }
+      }
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:OpenSessionResponseBody",
+      "properties" : {
+        "sessionHandle" : {
+          "type" : "string"
+        }
+      }
+    }
+  }, {
+    "url" : "/sessions/:session_handle",
+    "method" : "DELETE",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:CloseSessionResponseBody",
+      "properties" : {
+        "status" : {
+          "type" : "string"
+        }
+      }
+    }
+  }, {
+    "url" : "/sessions/:session_handle",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:GetSessionConfigResponseBody",
+      "properties" : {
+        "properties" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
+  }, {
+    "url" : "/sessions/:session_handle/configure-session",
+    "method" : "POST",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:ConfigureSessionRequestBody",
+      "properties" : {
+        "statement" : {
+          "type" : "string"
+        },
+        "executionTimeout" : {
+          "type" : "integer"
+        }
+      }
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
+    }
+  }, {
+    "url" : "/sessions/:session_handle/heartbeat",
+    "method" : "POST",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
+    }
+  }, {
+    "url" : "/sessions/:session_handle/operations/:operation_handle/cancel",
+    "method" : "POST",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      }, {
+        "key" : "operation_handle"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody",
+      "properties" : {
+        "status" : {
+          "type" : "string"
+        }
+      }
+    }
+  }, {
+    "url" : "/sessions/:session_handle/operations/:operation_handle/close",
+    "method" : "DELETE",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      }, {
+        "key" : "operation_handle"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody",
+      "properties" : {
+        "status" : {
+          "type" : "string"
+        }
+      }
+    }
+  }, {
+    "url" : 
"/sessions/:session_handle/operations/:operation_handle/result/:token",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      }, {
+        "key" : "operation_handle"
+      }, {
+        "key" : "token"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:FetchResultsResponseBody",
+      "properties" : {
+        "results" : {
+          "type" : "any"
+        },
+        "resultType" : {
+          "type" : "string"
+        },
+        "nextResultUri" : {
+          "type" : "string"
+        }
+      }
+    }
+  }, {
+    "url" : "/sessions/:session_handle/operations/:operation_handle/status",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      }, {
+        "key" : "operation_handle"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody",
+      "properties" : {
+        "status" : {
+          "type" : "string"
+        }
+      }
+    }
+  }, {
+    "url" : "/sessions/:session_handle/statements",
+    "method" : "POST",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:ExecuteStatementRequestBody",
+      "properties" : {
+        "statement" : {
+          "type" : "string"
+        },
+        "executionTimeout" : {
+          "type" : "integer"
+        },
+        "executionConfig" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "string"
+          }
+        }
+      }
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:ExecuteStatementResponseBody",
+      "properties" : {
+        "operationHandle" : {
+          "type" : "string"
+        }
+      }
+    }
+  } ]
+}

Reply via email to