This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 95cb47ef53c [FLINK-30449][sql-gateway] Introduce ResultInfo to improve 
the serde of FetchResultsResponseBody (#21523)
95cb47ef53c is described below

commit 95cb47ef53c822161ca90cdca43ad95fd1332633
Author: yuzelin <33053040+yuze...@users.noreply.github.com>
AuthorDate: Tue Dec 27 09:56:32 2022 +0800

    [FLINK-30449][sql-gateway] Introduce ResultInfo to improve the serde of 
FetchResultsResponseBody (#21523)
    
    This closes #21523
---
 .../flink/table/gateway/api/results/ResultSet.java |   9 +-
 .../handler/statement/FetchResultsHandler.java     |   4 +-
 ...nHeaders.java => AbstractOperationHeaders.java} |   2 +-
 .../header/operation/CancelOperationHeaders.java   |   2 +-
 .../header/operation/CloseOperationHeaders.java    |   2 +-
 .../operation/GetOperationStatusHeaders.java       |   2 +-
 .../header/statement/ExecuteStatementHeaders.java  |   2 +-
 .../operation/OperationHandleIdPathParameter.java  |   2 +-
 .../statement/ExecuteStatementRequestBody.java     |   2 +-
 .../statement/ExecuteStatementResponseBody.java    |   2 +-
 .../statement/FetchResultsResponseBody.java        |  20 +-
 .../statement/FetchResultsTokenParameters.java     |   2 +-
 .../statement/FetchResultsTokenPathParameter.java  |   2 +-
 .../flink/table/gateway/rest/serde/ColumnInfo.java |  14 +-
 .../rest/serde/JsonResultSetDeserializer.java      | 130 -----------
 .../rest/serde/JsonResultSetSerializer.java        | 127 -----------
 .../flink/table/gateway/rest/serde/ResultInfo.java | 111 ++++++++++
 .../rest/serde/ResultInfoJsonDeserializer.java     | 109 +++++++++
 .../rest/serde/ResultInfoJsonSerializer.java       | 138 ++++++++++++
 .../table/gateway/rest/serde/RowDataInfo.java      |  59 -----
 .../SqlGatewayRestEndpointStatementITCase.java     |  18 +-
 ...SerDeTest.java => ResultInfoJsonSerDeTest.java} | 246 ++++++++++-----------
 .../src/test/resources/resultInfo.txt              |   1 +
 pom.xml                                            |   1 +
 24 files changed, 531 insertions(+), 476 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
index c13e7d82fbe..6e397e22193 100644
--- 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
 
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -33,13 +32,7 @@ import java.util.stream.Collectors;
 
 /** The collection of the results. */
 @PublicEvolving
-public class ResultSet implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    public static final String FIELD_NAME_COLUMN_INFOS = "columns";
-
-    public static final String FIELD_NAME_DATA = "data";
+public class ResultSet {
 
     private final ResultType resultType;
 
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
index c8945974cef..52b18be0bed 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathPa
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter;
+import org.apache.flink.table.gateway.rest.serde.ResultInfo;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
 
 import javax.annotation.Nonnull;
@@ -86,6 +87,7 @@ public class FetchResultsHandler
                         nextToken);
 
         return CompletableFuture.completedFuture(
-                new FetchResultsResponseBody(resultSet, resultType, 
nextResultUri));
+                new FetchResultsResponseBody(
+                        ResultInfo.createResultInfo(resultSet), resultType, 
nextResultUri));
     }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstactOperationHeaders.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstractOperationHeaders.java
similarity index 97%
rename from 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstactOperationHeaders.java
rename to 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstractOperationHeaders.java
index 22d3ac07cb4..b842ff64241 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstactOperationHeaders.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstractOperationHeaders.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.table.gateway.rest.message.operation.OperationStatusResp
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** Abstract operation related message headers. */
-public abstract class AbstactOperationHeaders
+public abstract class AbstractOperationHeaders
         implements SqlGatewayMessageHeaders<
                 EmptyRequestBody, OperationStatusResponseBody, 
OperationMessageParameters> {
 
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CancelOperationHeaders.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CancelOperationHeaders.java
index e25a31c6fbe..958778ae9fc 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CancelOperationHeaders.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CancelOperationHeaders.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPa
 import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
 
 /** Message headers for canceling operation. */
-public class CancelOperationHeaders extends AbstactOperationHeaders {
+public class CancelOperationHeaders extends AbstractOperationHeaders {
 
     private static final CancelOperationHeaders INSTANCE = new 
CancelOperationHeaders();
 
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CloseOperationHeaders.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CloseOperationHeaders.java
index 1b9ae4ea4ec..c7b53888626 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CloseOperationHeaders.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CloseOperationHeaders.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPa
 import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
 
 /** Message headers for closing operation. */
-public class CloseOperationHeaders extends AbstactOperationHeaders {
+public class CloseOperationHeaders extends AbstractOperationHeaders {
 
     private static final CloseOperationHeaders INSTANCE = new 
CloseOperationHeaders();
 
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/GetOperationStatusHeaders.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/GetOperationStatusHeaders.java
index f05d1538667..791b9294067 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/GetOperationStatusHeaders.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/GetOperationStatusHeaders.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPa
 import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
 
 /** Message headers for getting the status of operation. */
-public class GetOperationStatusHeaders extends AbstactOperationHeaders {
+public class GetOperationStatusHeaders extends AbstractOperationHeaders {
 
     private static final GetOperationStatusHeaders INSTANCE = new 
GetOperationStatusHeaders();
 
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/ExecuteStatementHeaders.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/ExecuteStatementHeaders.java
index fb30d3fb83a..5c36745edd9 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/ExecuteStatementHeaders.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/ExecuteStatementHeaders.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRes
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-/** Message headers for execute a statement. */
+/** Message headers for executing a statement. */
 public class ExecuteStatementHeaders
         implements SqlGatewayMessageHeaders<
                 ExecuteStatementRequestBody,
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/operation/OperationHandleIdPathParameter.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/operation/OperationHandleIdPathParameter.java
index 26df625210a..f3e97a66701 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/operation/OperationHandleIdPathParameter.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/operation/OperationHandleIdPathParameter.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.gateway.api.operation.OperationHandle;
 
 import java.util.UUID;
 
-/** {@link MessagePathParameter} that parse the {@link OperationHandle}. */
+/** {@link MessagePathParameter} that parses the {@link OperationHandle}. */
 public class OperationHandleIdPathParameter extends 
MessagePathParameter<OperationHandle> {
 
     public static final String KEY = "operation_handle";
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementRequestBody.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementRequestBody.java
index 6b0ac225cfa..00e05b18bfe 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementRequestBody.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementRequestBody.java
@@ -27,7 +27,7 @@ import javax.annotation.Nullable;
 
 import java.util.Map;
 
-/** {@link RequestBody} for execute a statement. */
+/** {@link RequestBody} for executing a statement. */
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class ExecuteStatementRequestBody implements RequestBody {
 
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementResponseBody.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementResponseBody.java
index 80faaf5986d..086bc2bf5ce 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementResponseBody.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementResponseBody.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-/** {@link ResponseBody} for execute a statement. */
+/** {@link ResponseBody} for executing a statement. */
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class ExecuteStatementResponseBody implements ResponseBody {
 
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
index bff14c98b40..ccdaee00731 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
@@ -19,17 +19,18 @@
 package org.apache.flink.table.gateway.rest.message.statement;
 
 import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.table.gateway.api.results.ResultSet;
-import org.apache.flink.table.gateway.rest.serde.JsonResultSetDeserializer;
-import org.apache.flink.table.gateway.rest.serde.JsonResultSetSerializer;
+import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+import org.apache.flink.table.gateway.rest.serde.ResultInfoJsonDeserializer;
+import org.apache.flink.table.gateway.rest.serde.ResultInfoJsonSerializer;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import javax.annotation.Nullable;
 
-/** {@link ResponseBody} for execute a statement. */
+/** {@link ResponseBody} for executing a statement. */
 public class FetchResultsResponseBody implements ResponseBody {
 
     private static final String FIELD_RESULT_TYPE = "resultType";
@@ -37,9 +38,9 @@ public class FetchResultsResponseBody implements ResponseBody 
{
     private static final String FIELD_NEXT_RESULT_URI = "nextResultUri";
 
     @JsonProperty(FIELD_RESULTS)
-    @JsonSerialize(using = JsonResultSetSerializer.class)
-    @JsonDeserialize(using = JsonResultSetDeserializer.class)
-    private final ResultSet results;
+    @JsonSerialize(using = ResultInfoJsonSerializer.class)
+    @JsonDeserialize(using = ResultInfoJsonDeserializer.class)
+    private final ResultInfo results;
 
     @JsonProperty(FIELD_RESULT_TYPE)
     private final String resultType;
@@ -48,8 +49,9 @@ public class FetchResultsResponseBody implements ResponseBody 
{
     @Nullable
     private final String nextResultUri;
 
+    @JsonCreator
     public FetchResultsResponseBody(
-            @JsonProperty(FIELD_RESULTS) ResultSet results,
+            @JsonProperty(FIELD_RESULTS) ResultInfo results,
             @JsonProperty(FIELD_RESULT_TYPE) String resultType,
             @Nullable @JsonProperty(FIELD_NEXT_RESULT_URI) String 
nextResultUri) {
         this.results = results;
@@ -57,7 +59,7 @@ public class FetchResultsResponseBody implements ResponseBody 
{
         this.nextResultUri = nextResultUri;
     }
 
-    public ResultSet getResults() {
+    public ResultInfo getResults() {
         return results;
     }
 
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java
index aa014c44464..132dc631a7b 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java
@@ -30,7 +30,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 
-/** {@link MessagePathParameter} for fetch results. */
+/** {@link MessagePathParameter} for fetching results. */
 public class FetchResultsTokenParameters extends MessageParameters {
 
     private final SessionHandleIdPathParameter sessionHandleIdPathParameter =
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenPathParameter.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenPathParameter.java
index f3cb420585c..c91fe56ba98 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenPathParameter.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenPathParameter.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.gateway.rest.message.statement;
 
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 
-/** {@link MessagePathParameter} that parse the token string. */
+/** {@link MessagePathParameter} that parses the token string. */
 public class FetchResultsTokenPathParameter extends MessagePathParameter<Long> 
{
 
     public static final String KEY = "token";
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ColumnInfo.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ColumnInfo.java
index d07218430c0..658bdc1854c 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ColumnInfo.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ColumnInfo.java
@@ -19,7 +19,9 @@
 package org.apache.flink.table.gateway.rest.serde;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -60,8 +62,16 @@ public class ColumnInfo {
         this.comment = comment;
     }
 
-    public static ColumnInfo create(String name, LogicalType type, @Nullable 
String comment) {
-        return new ColumnInfo(name, type, comment);
+    public static ColumnInfo toColumnInfo(Column column) {
+        return new ColumnInfo(
+                column.getName(),
+                column.getDataType().getLogicalType(),
+                column.getComment().orElse(null));
+    }
+
+    public Column toColumn() {
+        return Column.physical(name, 
DataTypeUtils.toInternalDataType(logicalType))
+                .withComment(comment);
     }
 
     public String getName() {
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetDeserializer.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetDeserializer.java
deleted file mode 100644
index 77c6c8976ce..00000000000
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetDeserializer.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.gateway.rest.serde;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonToRowDataConverters;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryStringData;
-import org.apache.flink.table.gateway.api.results.ResultSet;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
-import org.apache.flink.types.RowKind;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.table.gateway.api.results.ResultSet.FIELD_NAME_COLUMN_INFOS;
-import static 
org.apache.flink.table.gateway.api.results.ResultSet.FIELD_NAME_DATA;
-
-/** Json deserializer for {@link ResultSet}. */
-@Internal
-public class JsonResultSetDeserializer extends StdDeserializer<ResultSet> {
-
-    private static final long serialVersionUID = 1L;
-
-    public JsonResultSetDeserializer() {
-        super(ResultSet.class);
-    }
-
-    private static final JsonToRowDataConverters TO_ROWDATA_CONVERTERS =
-            new JsonToRowDataConverters(false, false, 
TimestampFormat.ISO_8601);
-
-    @Override
-    public ResultSet deserialize(JsonParser jsonParser, DeserializationContext 
ctx)
-            throws IOException {
-        JsonNode node = jsonParser.getCodec().readTree(jsonParser);
-        ResolvedSchema resolvedSchema;
-        List<RowData> data = new ArrayList<>();
-
-        // Deserialize column infos
-        ColumnInfo[] columnInfos =
-                jsonParser
-                        .getCodec()
-                        .treeToValue(node.get(FIELD_NAME_COLUMN_INFOS), 
ColumnInfo[].class);
-        List<Column> columns = new ArrayList<>();
-        for (ColumnInfo columnInfo : columnInfos) {
-            LogicalType logicalType = columnInfo.getLogicalType();
-            columns.add(
-                    Column.physical(
-                            columnInfo.getName(), 
DataTypeUtils.toInternalDataType(logicalType)));
-        }
-
-        // Parse the schema from Column
-        resolvedSchema = ResolvedSchema.of(columns);
-        // The fieldType for all RowData
-        List<LogicalType> fieldTypes =
-                resolvedSchema.getColumnDataTypes().stream()
-                        .map(DataType::getLogicalType)
-                        .collect(Collectors.toList());
-
-        // Generate converters for all fieldTypes
-        List<JsonToRowDataConverters.JsonToRowDataConverter> converters =
-                fieldTypes.stream()
-                        .map(TO_ROWDATA_CONVERTERS::createConverter)
-                        .collect(Collectors.toList());
-
-        // Get the RowDataInfo
-        JsonParser dataParser = node.get(FIELD_NAME_DATA).traverse();
-        dataParser.nextToken();
-        RowDataInfo[] rowDataInfos = ctx.readValue(dataParser, 
RowDataInfo[].class);
-
-        // Parse the RowData from RowDataInfo
-        for (RowDataInfo rowDataInfo : rowDataInfos) {
-            RowKind rowKind = RowKind.valueOf(rowDataInfo.getKind());
-            GenericRowData rowData = new GenericRowData(rowKind, 
rowDataInfo.getFields().size());
-            List<JsonNode> fields = rowDataInfo.getFields();
-            // Setting fields of one RowData
-            for (int i = 0; i < rowData.getArity(); ++i) {
-                JsonNode jsonNode = fields.get(i);
-                JsonToRowDataConverters.JsonToRowDataConverter converter = 
converters.get(i);
-                Object object = 
buildObjectValueConverter(converter).apply(jsonNode);
-                if (object != null && object.toString().equals("null")) {
-                    rowData.setField(i, new BinaryStringData(""));
-                } else {
-                    rowData.setField(i, object);
-                }
-            }
-            data.add(rowData);
-        }
-
-        // The placeholder is used to build a ResultSet
-        ResultSet.ResultType resultTypePlaceHolder = 
ResultSet.ResultType.PAYLOAD;
-        Long nextTokenPlaceHolder = 0L;
-        return new ResultSet(resultTypePlaceHolder, nextTokenPlaceHolder, 
resolvedSchema, data);
-    }
-
-    private static Function<JsonNode, Object> buildObjectValueConverter(
-            JsonToRowDataConverters.JsonToRowDataConverter converter) {
-        return converter::convert;
-    }
-}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerializer.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerializer.java
deleted file mode 100644
index 3bdb1687058..00000000000
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerializer.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.gateway.rest.serde;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.formats.json.RowDataToJsonConverters;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.gateway.api.results.ResultSet;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.types.RowKind;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.table.gateway.api.results.ResultSet.FIELD_NAME_COLUMN_INFOS;
-import static 
org.apache.flink.table.gateway.api.results.ResultSet.FIELD_NAME_DATA;
-
-/** Json serializer for {@link ResultSet}. */
-@Internal
-public class JsonResultSetSerializer extends StdSerializer<ResultSet> {
-
-    private static final long serialVersionUID = 1L;
-
-    public JsonResultSetSerializer() {
-        super(ResultSet.class);
-    }
-
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
-            new RowDataToJsonConverters(
-                    TimestampFormat.ISO_8601, 
JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
-
-    @Override
-    public void serialize(
-            ResultSet resultSet, JsonGenerator jsonGenerator, 
SerializerProvider serializerProvider)
-            throws IOException {
-        jsonGenerator.writeStartObject();
-
-        // 1.Serialize ColumnInfo
-        List<Column> columns = resultSet.getResultSchema().getColumns();
-        List<ColumnInfo> columnInfos = new ArrayList<>();
-        for (Column column : columns) {
-            columnInfos.add(
-                    new ColumnInfo(
-                            column.getName(),
-                            column.getDataType().getLogicalType(),
-                            column.getComment().orElse(null)));
-        }
-        serializerProvider.defaultSerializeField(
-                FIELD_NAME_COLUMN_INFOS, columnInfos, jsonGenerator);
-
-        // 2.Serialize RowData
-
-        // The fieldGetters for all RowData
-        List<RowData.FieldGetter> fieldGetters = new ArrayList<>();
-        for (int i = 0; i < resultSet.getResultSchema().getColumnCount(); i++) 
{
-            fieldGetters.add(
-                    RowData.createFieldGetter(
-                            resultSet
-                                    .getResultSchema()
-                                    .getColumnDataTypes()
-                                    .get(i)
-                                    .getLogicalType(),
-                            i));
-        }
-
-        // The fieldType for all RowData
-        List<LogicalType> fieldTypes =
-                resultSet.getResultSchema().getColumnDataTypes().stream()
-                        .map(DataType::getLogicalType)
-                        .collect(Collectors.toList());
-
-        // Generate converters for all fieldTypes
-        List<RowDataToJsonConverters.RowDataToJsonConverter> converters =
-                fieldTypes.stream()
-                        .map(TO_JSON_CONVERTERS::createConverter)
-                        .collect(Collectors.toList());
-        List<RowDataInfo> data = new ArrayList<>();
-        for (RowData row : resultSet.getData()) {
-            RowKind rowKind = row.getRowKind();
-            List<JsonNode> fields = new ArrayList<>();
-            for (int i = 0; i < row.getArity(); ++i) {
-                Object field = fieldGetters.get(i).getFieldOrNull(row);
-                RowDataToJsonConverters.RowDataToJsonConverter converter = 
converters.get(i);
-                fields.add(buildJsonValueConverter(converter).apply(field));
-            }
-            data.add(new RowDataInfo(rowKind.name(), fields));
-        }
-
-        serializerProvider.defaultSerializeField(FIELD_NAME_DATA, data, 
jsonGenerator);
-        jsonGenerator.writeEndObject();
-    }
-
-    private static Function<Object, JsonNode> buildJsonValueConverter(
-            RowDataToJsonConverters.RowDataToJsonConverter converter) {
-        return field -> converter.convert(OBJECT_MAPPER, null, field);
-    }
-}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
new file mode 100644
index 00000000000..c7ec5b05ff9
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * A {@code ResultInfo} contains information of a {@link ResultSet}. It is 
designed for transferring
+ * the information of ResultSet via REST. For its serialization and 
deserialization, See:
+ *
+ * <p>{@link ResultInfoJsonSerializer} and {@link ResultInfoJsonDeserializer}
+ */
+@Internal
+public class ResultInfo {
+
+    // Columns
+    public static final String FIELD_NAME_COLUMN_INFOS = "columns";
+
+    // RowData
+    public static final String FIELD_NAME_DATA = "data";
+    public static final String FIELD_NAME_KIND = "kind";
+    public static final String FIELD_NAME_FIELDS = "fields";
+
+    private final List<ColumnInfo> columnInfos;
+    private final List<RowData> data;
+
+    public ResultInfo(List<ColumnInfo> columnInfos, List<RowData> data) {
+        this.columnInfos = columnInfos;
+        this.data = data;
+    }
+
+    public static ResultInfo createResultInfo(ResultSet resultSet) {
+        return new ResultInfo(
+                resultSet.getResultSchema().getColumns().stream()
+                        .map(ColumnInfo::toColumnInfo)
+                        .collect(Collectors.toList()),
+                resultSet.getData());
+    }
+
+    public List<ColumnInfo> getColumnInfos() {
+        return Collections.unmodifiableList(columnInfos);
+    }
+
+    public List<RowData> getData() {
+        return data;
+    }
+
+    public List<RowData.FieldGetter> getFieldGetters() {
+        List<LogicalType> columnTypes =
+                
columnInfos.stream().map(ColumnInfo::getLogicalType).collect(Collectors.toList());
+        return IntStream.range(0, columnTypes.size())
+                .mapToObj(i -> RowData.createFieldGetter(columnTypes.get(i), 
i))
+                .collect(Collectors.toList());
+    }
+
+    public ResolvedSchema getResultSchema() {
+        return ResolvedSchema.of(
+                
columnInfos.stream().map(ColumnInfo::toColumn).collect(Collectors.toList()));
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(columnInfos, data);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ResultInfo)) {
+            return false;
+        }
+        ResultInfo that = (ResultInfo) o;
+        return Objects.equals(columnInfos, that.columnInfos) && 
Objects.equals(data, that.data);
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "ResultInfo{\n  columnInfos=[%s],\n  rows=[%s]\n}",
+                
columnInfos.stream().map(Object::toString).collect(Collectors.joining(",")),
+                
data.stream().map(Object::toString).collect(Collectors.joining(",")));
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java
new file mode 100644
index 00000000000..c842ebc6051
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonToRowDataConverters;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_COLUMN_INFOS;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_DATA;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_FIELDS;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_KIND;
+
+/**
+ * Json deserializer for {@link ResultInfo}.
+ *
+ * @see ResultInfoJsonSerializer for the reverse operation.
+ */
+@Internal
+public class ResultInfoJsonDeserializer extends StdDeserializer<ResultInfo> {
+
+    private static final long serialVersionUID = 1L;
+
+    public ResultInfoJsonDeserializer() {
+        super(ResultInfo.class);
+    }
+
+    private static final JsonToRowDataConverters TO_ROW_DATA_CONVERTERS =
+            new JsonToRowDataConverters(false, false, 
TimestampFormat.ISO_8601);
+
+    @Override
+    public ResultInfo deserialize(JsonParser jsonParser, 
DeserializationContext ctx)
+            throws IOException {
+        JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+
+        // deserialize ColumnInfos
+        List<ColumnInfo> columnInfos =
+                Arrays.asList(
+                        jsonParser
+                                .getCodec()
+                                .treeToValue(
+                                        node.get(FIELD_NAME_COLUMN_INFOS), 
ColumnInfo[].class));
+
+        // generate converters for all fields of each row
+        List<JsonToRowDataConverters.JsonToRowDataConverter> converters =
+                columnInfos.stream()
+                        .map(ColumnInfo::getLogicalType)
+                        .map(TO_ROW_DATA_CONVERTERS::createConverter)
+                        .collect(Collectors.toList());
+
+        // deserialize rows
+        List<RowData> data = deserializeData((ArrayNode) 
node.get(FIELD_NAME_DATA), converters);
+
+        return new ResultInfo(columnInfos, data);
+    }
+
+    private List<RowData> deserializeData(
+            ArrayNode serializedRows,
+            List<JsonToRowDataConverters.JsonToRowDataConverter> converters) {
+        List<RowData> data = new ArrayList<>();
+        serializedRows.forEach(rowDataNode -> 
data.add(convertToRowData(rowDataNode, converters)));
+        return data;
+    }
+
+    private GenericRowData convertToRowData(
+            JsonNode serializedRow,
+            List<JsonToRowDataConverters.JsonToRowDataConverter> converters) {
+        ArrayNode fieldsArrayNode = (ArrayNode) 
serializedRow.get(FIELD_NAME_FIELDS);
+        List<JsonNode> fieldNodes = 
CollectionUtil.iteratorToList(fieldsArrayNode.iterator());
+        return GenericRowData.ofKind(
+                RowKind.valueOf(serializedRow.get(FIELD_NAME_KIND).asText()),
+                IntStream.range(0, fieldNodes.size())
+                        .mapToObj(i -> 
converters.get(i).convert(fieldNodes.get(i)))
+                        .toArray());
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java
new file mode 100644
index 00000000000..762e634b39f
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.table.data.RowData;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_COLUMN_INFOS;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_DATA;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_FIELDS;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_KIND;
+
+/**
+ * Json serializer for {@link ResultInfo}.
+ *
+ * @see ResultInfoJsonDeserializer for the reverse operation.
+ */
+@Internal
+public class ResultInfoJsonSerializer extends StdSerializer<ResultInfo> {
+
+    private static final long serialVersionUID = 1L;
+
+    public ResultInfoJsonSerializer() {
+        super(ResultInfo.class);
+    }
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
+            new RowDataToJsonConverters(
+                    TimestampFormat.ISO_8601, 
JsonFormatOptions.MapNullKeyMode.LITERAL, "");
+
+    @Override
+    public void serialize(
+            ResultInfo resultInfo,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStartObject();
+
+        // serialize ColumnInfos
+        serializerProvider.defaultSerializeField(
+                FIELD_NAME_COLUMN_INFOS, resultInfo.getColumnInfos(), 
jsonGenerator);
+
+        // serialize data
+        serializeData(resultInfo.getData(), buildToJsonConverters(resultInfo), 
jsonGenerator);
+
+        jsonGenerator.writeEndObject();
+    }
+
+    private void serializeData(
+            List<RowData> data,
+            List<Function<RowData, JsonNode>> converters,
+            JsonGenerator jsonGenerator)
+            throws IOException {
+        // format:
+        // data: [{"kind": "", "fields": []}, ...]
+        ArrayNode serializedData = OBJECT_MAPPER.createArrayNode();
+        serializedData.addAll(
+                data.stream()
+                        .map(rowData -> convertRowData(rowData, converters))
+                        .collect(Collectors.toList()));
+        jsonGenerator.writeFieldName(FIELD_NAME_DATA);
+        jsonGenerator.writeTree(serializedData);
+    }
+
+    private JsonNode convertRowData(RowData rowData, List<Function<RowData, 
JsonNode>> converters) {
+        ObjectNode serializedRowData = OBJECT_MAPPER.createObjectNode();
+        // kind
+        serializedRowData.put(FIELD_NAME_KIND, rowData.getRowKind().name());
+        // fields
+        ArrayNode fields = serializedRowData.putArray(FIELD_NAME_FIELDS);
+        fields.addAll(
+                converters.stream()
+                        .map(converter -> converter.apply(rowData))
+                        .collect(Collectors.toList()));
+
+        return serializedRowData;
+    }
+
+    /** Composes the FieldGetter and RowDataToJsonConverter. */
+    private List<Function<RowData, JsonNode>> buildToJsonConverters(ResultInfo 
resultInfo) {
+        List<RowDataToJsonConverters.RowDataToJsonConverter> converters =
+                resultInfo.getColumnInfos().stream()
+                        .map(ColumnInfo::getLogicalType)
+                        .map(TO_JSON_CONVERTERS::createConverter)
+                        .collect(Collectors.toList());
+
+        List<RowData.FieldGetter> fieldGetters = resultInfo.getFieldGetters();
+
+        return IntStream.range(0, converters.size())
+                .mapToObj(
+                        i ->
+                                (Function<RowData, JsonNode>)
+                                        rowData ->
+                                                converters
+                                                        .get(i)
+                                                        .convert(
+                                                                OBJECT_MAPPER,
+                                                                null,
+                                                                fieldGetters
+                                                                        .get(i)
+                                                                        
.getFieldOrNull(rowData)))
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/RowDataInfo.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/RowDataInfo.java
deleted file mode 100644
index b041a5c6dd2..00000000000
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/RowDataInfo.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.gateway.rest.serde;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.util.Preconditions;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-
-import java.util.List;
-
-/** A RowDataInfo info represents a {@link RowData}. */
-@Internal
-public class RowDataInfo {
-
-    private static final String FIELD_NAME_KIND = "kind";
-    private static final String FIELD_NAME_FIELDS = "fields";
-
-    @JsonProperty(FIELD_NAME_KIND)
-    private final String kind;
-
-    @JsonProperty(FIELD_NAME_FIELDS)
-    private final List<JsonNode> fields;
-
-    @JsonCreator
-    public RowDataInfo(
-            @JsonProperty(FIELD_NAME_KIND) String kind,
-            @JsonProperty(FIELD_NAME_FIELDS) List<JsonNode> fields) {
-        this.kind = Preconditions.checkNotNull(kind, "kind must not be null");
-        this.fields = Preconditions.checkNotNull(fields, "fields must not be 
null");
-    }
-
-    public String getKind() {
-        return kind;
-    }
-
-    public List<JsonNode> getFields() {
-        return fields;
-    }
-}
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
index ebf721e930e..019bcdd8abd 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
@@ -40,6 +41,7 @@ import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementReq
 import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import org.apache.flink.table.gateway.rest.serde.ResultInfo;
 import 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
 import 
org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
 import org.apache.flink.table.utils.DateTimeUtils;
@@ -139,20 +141,23 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
         FetchResultsResponseBody fetchResultsResponseBody =
                 fetchResults(sessionHandle, operationHandle, 0L);
 
-        ResultSet resultSet = fetchResultsResponseBody.getResults();
+        ResultInfo resultInfo = fetchResultsResponseBody.getResults();
+        assertThat(resultInfo).isNotNull();
+
         String resultType = fetchResultsResponseBody.getResultType();
-        assertThat(resultSet).isNotNull();
         assertThat(
                         Arrays.asList(
                                 ResultSet.ResultType.PAYLOAD.name(),
                                 ResultSet.ResultType.EOS.name()))
                 .contains(resultType);
 
+        ResolvedSchema resultSchema = resultInfo.getResultSchema();
+
         return toString(
                 StatementType.match(statement),
-                resultSet.getResultSchema(),
+                resultSchema,
                 new RowDataToStringConverterImpl(
-                        resultSet.getResultSchema().toPhysicalRowDataType(),
+                        resultSchema.toPhysicalRowDataType(),
                         DateTimeUtils.UTC_ZONE.toZoneId(),
                         
SqlGatewayRestEndpointStatementITCase.class.getClassLoader(),
                         false),
@@ -234,10 +239,11 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
         private void fetch() throws Exception {
             FetchResultsResponseBody fetchResultsResponseBody =
                     fetchResults(sessionHandle, operationHandle, token);
+
             String nextResultUri = fetchResultsResponseBody.getNextResultUri();
-            ResultSet resultSet = fetchResultsResponseBody.getResults();
             token = parseTokenFromUri(nextResultUri);
-            fetchedRows = resultSet.getData().iterator();
+
+            fetchedRows = 
fetchResultsResponseBody.getResults().getData().iterator();
         }
     }
 
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerDeTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
similarity index 58%
rename from 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerDeTest.java
rename to 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
index eee260f6093..3541e75cdad 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerDeTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
@@ -27,14 +27,19 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
 
+import org.apache.commons.io.IOUtils;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDate;
@@ -72,149 +77,108 @@ import static 
org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZON
 import static org.apache.flink.table.api.DataTypes.TINYINT;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link JsonResultSetSerializer} and {@link 
JsonResultSetDeserializer}. */
-class JsonResultSetSerDeTest {
+/** Tests for {@link ResultInfoJsonSerializer} and {@link 
ResultInfoJsonDeserializer}. */
+public class ResultInfoJsonSerDeTest {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final Row testRow = initRow();
 
-    private static final byte tinyint = 'c';
-    private static final short smallint = 128;
-    private static final int intValue = 45536;
-    private static final float floatValue = 33.333F;
-    private static final long bigint = 1238123899121L;
-    private static final String name = "asdlkjasjkdla998y1122";
-    private static final byte[] bytes = new byte[1024];
-    private static final Double[] doubles = new Double[] {1.1, 2.2, 3.3};
-    private static final BigDecimal decimal = new BigDecimal("123.456789");
-    private static final LocalDate date = LocalDate.parse("1990-10-14");
-    private static final LocalTime time = LocalTime.parse("12:12:43");
-    private static final Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 
12:12:43.123");
-    private static final Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 
12:12:43.123456789");
-    private static final Instant timestampWithLocalZone =
-            LocalDateTime.of(1990, 10, 14, 12, 12, 43, 123456789)
-                    .atOffset(ZoneOffset.of("Z"))
-                    .toInstant();
-    private static final int ROW_NUMBER = 10;
-
-    private static final Map<String, Long> map = new HashMap<>();
-    private static final Map<String, Integer> multiSet = new HashMap<>();
-    private static final Map<String, Map<String, Integer>> nestedMap = new 
HashMap<>();
-    private static final Map<String, Integer> innerMap = new HashMap<>();
-
-    static {
-        map.put("element", 123L);
-        multiSet.put("element", 2);
-        innerMap.put("key", 234);
-        nestedMap.put("inner_map", innerMap);
-        ThreadLocalRandom.current().nextBytes(bytes);
+    @BeforeAll
+    public static void setUp() {
+        SimpleModule simpleModule = new SimpleModule();
+        simpleModule.addSerializer(ResultInfo.class, new 
ResultInfoJsonSerializer());
+        simpleModule.addDeserializer(ResultInfo.class, new 
ResultInfoJsonDeserializer());
+        OBJECT_MAPPER.registerModule(simpleModule);
     }
 
     @Test
-    void testSerDeResultSetWithSingleRowData() throws Exception {
-        Row row = getTestRowData();
-        seDeResultSet(Collections.singletonList(row), getFields());
+    public void testResultInfoSerDeWithSingleRow() throws Exception {
+        serDeTest(Collections.singletonList(testRow));
     }
 
     @Test
-    void testSerDeResultSetWithMultiRowData() throws Exception {
-        ArrayList<Row> rowList = new ArrayList<>();
-        for (int i = 0; i < ROW_NUMBER; ++i) {
-            rowList.add(getTestRowData());
+    public void testResultInfoSerDeWithMultiRowData() throws Exception {
+        List<Row> rows = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            rows.add(testRow);
         }
-        seDeResultSet(rowList, getFields());
+        serDeTest(rows);
     }
 
     @Test
-    void testSerDeResultSetWithRowDataNullValues() throws Exception {
-        List<Row> rowList = new ArrayList<>();
+    public void testResultInfoSerDeWithNullValues() throws Exception {
+        List<Row> rows = new ArrayList<>();
         List<Integer> positions = new ArrayList<>();
-        for (int i = 0; i < 18; ++i) {
+        for (int i = 0; i < 18; i++) {
             positions.add(new Random().nextInt(18));
         }
-        for (int i = 0; i < ROW_NUMBER; ++i) {
-            rowList.add(getTestRowDataWithNullValues(positions));
+        for (int i = 0; i < 10; i++) {
+            rows.add(getTestRowDataWithNullValues(initRow(), positions));
         }
-        seDeResultSet(rowList, getFields());
+        serDeTest(rows);
     }
 
-    void seDeResultSet(List<Row> rowList, List<DataTypes.Field> fields) throws 
IOException {
+    @Test
+    public void testDeserializationFromJson() throws Exception {
+        URL url = 
ResultInfoJsonSerDeTest.class.getClassLoader().getResource("resultInfo.txt");
+        String input =
+                IOUtils.toString(Preconditions.checkNotNull(url), 
StandardCharsets.UTF_8).trim();
+        ResultInfo deserializedResult = OBJECT_MAPPER.readValue(input, 
ResultInfo.class);
+        
assertThat(OBJECT_MAPPER.writeValueAsString(deserializedResult)).isEqualTo(input);
+    }
+
+    private void serDeTest(List<Row> rows) throws IOException {
         List<RowData> rowDataList =
-                rowList.stream()
-                        .map(JsonResultSetSerDeTest::convertToInternal)
-                        .collect(Collectors.toList());
-        ResolvedSchema testResolvedSchema = getTestResolvedSchema(fields);
-        ResultSet testResultSet =
-                new ResultSet(ResultSet.ResultType.PAYLOAD, 0L, 
testResolvedSchema, rowDataList);
-        // Test serialization & deserialization
-        ObjectMapper objectMapper = new ObjectMapper();
-        SimpleModule resultSetModule = new SimpleModule();
-        resultSetModule.addSerializer(ResultSet.class, new 
JsonResultSetSerializer());
-        resultSetModule.addDeserializer(ResultSet.class, new 
JsonResultSetDeserializer());
-        objectMapper.registerModule(resultSetModule);
-        String result = objectMapper.writeValueAsString(testResultSet);
-        ResultSet resultSet = objectMapper.readValue(result, ResultSet.class);
-        List<RowData> deRowDataList = resultSet.getData();
-        for (int i = 0; i < deRowDataList.size(); ++i) {
-            assertThat(convertToExternal(deRowDataList.get(i), 
ROW(getFields())))
-                    .isEqualTo(rowList.get(i));
+                
rows.stream().map(this::convertToInternal).collect(Collectors.toList());
+        ResolvedSchema testResolvedSchema = getTestResolvedSchema(getFields());
+        ResultInfo testResultInfo =
+                ResultInfo.createResultInfo(
+                        new ResultSet(
+                                ResultSet.ResultType.PAYLOAD, 0L, 
testResolvedSchema, rowDataList));
+
+        // test serialization & deserialization
+        String result = OBJECT_MAPPER.writeValueAsString(testResultInfo);
+        ResultInfo resultInfo = OBJECT_MAPPER.readValue(result, 
ResultInfo.class);
+
+        assertThat(resultInfo.getResultSchema().toString())
+                .isEqualTo(testResultInfo.getResultSchema().toString());
+
+        List<RowData> data = resultInfo.getData();
+        for (int i = 0; i < data.size(); i++) {
+            assertThat(convertToExternal(data.get(i), 
ROW(getFields()))).isEqualTo(rows.get(i));
         }
-        assertThat(resultSet.getResultSchema().toString())
-                .isEqualTo(testResultSet.getResultSchema().toString());
     }
 
-    private static ResolvedSchema getTestResolvedSchema(List<DataTypes.Field> 
fields) {
-        List<String> columnNames =
-                
fields.stream().map(DataTypes.AbstractField::getName).collect(Collectors.toList());
-        List<DataType> columnDataTypes =
-                
fields.stream().map(DataTypes.Field::getDataType).collect(Collectors.toList());
-        return ResolvedSchema.physical(columnNames, columnDataTypes);
-    }
+    private static Row initRow() {
+        final byte tinyint = 'c';
+        final short smallint = 128;
+        final int intValue = 45536;
+        final float floatValue = 33.333F;
+        final long bigint = 1238123899121L;
+        final String name = "asdlkjasjkdla998y1122";
+        final byte[] bytes = new byte[1024];
+        ThreadLocalRandom.current().nextBytes(bytes);
+        final Double[] doubles = new Double[] {1.1, 2.2, 3.3};
+        final BigDecimal decimal = new BigDecimal("123.456789");
+        final LocalDate date = LocalDate.parse("1990-10-14");
+        final LocalTime time = LocalTime.parse("12:12:43");
+        final Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 
12:12:43.123");
+        final Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 
12:12:43.123456789");
+        final Instant timestampWithLocalZone =
+                LocalDateTime.of(1990, 10, 14, 12, 12, 43, 123456789)
+                        .atOffset(ZoneOffset.of("Z"))
+                        .toInstant();
 
-    private static List<DataTypes.Field> getFields() {
-        return Arrays.asList(
-                FIELD("bool", BOOLEAN()),
-                FIELD("tinyint", TINYINT()),
-                FIELD("smallint", SMALLINT()),
-                FIELD("int", INT()),
-                FIELD("bigint", BIGINT()),
-                FIELD("float", FLOAT()),
-                FIELD("name", STRING()),
-                FIELD("bytes", BYTES()),
-                FIELD("decimal", DECIMAL(9, 6)),
-                FIELD("doubles", ARRAY(DOUBLE())),
-                FIELD("date", DATE()),
-                FIELD("time", TIME(0)),
-                FIELD("timestamp3", TIMESTAMP(3)),
-                FIELD("timestamp9", TIMESTAMP(9)),
-                FIELD("timestampWithLocalZone", 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
-                FIELD("map", MAP(STRING(), BIGINT())),
-                FIELD("multiSet", MULTISET(STRING())),
-                FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))));
-    }
+        final Map<String, Long> map = new HashMap<>();
+        map.put("element", 123L);
 
-    private static Row getTestRowData() {
-        Row testRow = new Row(18);
-        setRandomKind(testRow);
-        testRow.setField(0, true);
-        testRow.setField(1, tinyint);
-        testRow.setField(2, smallint);
-        testRow.setField(3, intValue);
-        testRow.setField(4, bigint);
-        testRow.setField(5, floatValue);
-        testRow.setField(6, name);
-        testRow.setField(7, bytes);
-        testRow.setField(8, decimal);
-        testRow.setField(9, doubles);
-        testRow.setField(10, date);
-        testRow.setField(11, time);
-        testRow.setField(12, timestamp3.toLocalDateTime());
-        testRow.setField(13, timestamp9.toLocalDateTime());
-        testRow.setField(14, timestampWithLocalZone);
-        testRow.setField(15, map);
-        testRow.setField(16, multiSet);
-        testRow.setField(17, nestedMap);
-        return testRow;
-    }
+        final Map<String, Integer> multiSet = new HashMap<>();
+        multiSet.put("element", 2);
+
+        final Map<String, Map<String, Integer>> nestedMap = new HashMap<>();
+        Map<String, Integer> innerMap = new HashMap<>();
+        innerMap.put("key", 234);
+        nestedMap.put("inner_map", innerMap);
 
-    private static Row getTestRowDataWithNullValues(List<Integer> positions) {
         Row testRow = new Row(18);
         setRandomKind(testRow);
         testRow.setField(0, true);
@@ -235,9 +199,6 @@ class JsonResultSetSerDeTest {
         testRow.setField(15, map);
         testRow.setField(16, multiSet);
         testRow.setField(17, nestedMap);
-        for (int position : positions) {
-            testRow.setField(position, null);
-        }
         return testRow;
     }
 
@@ -259,13 +220,50 @@ class JsonResultSetSerDeTest {
         }
     }
 
+    private List<DataTypes.Field> getFields() {
+        return Arrays.asList(
+                FIELD("bool", BOOLEAN()),
+                FIELD("tinyint", TINYINT()),
+                FIELD("smallint", SMALLINT()),
+                FIELD("int", INT()),
+                FIELD("bigint", BIGINT()),
+                FIELD("float", FLOAT()),
+                FIELD("name", STRING()),
+                FIELD("bytes", BYTES()),
+                FIELD("decimal", DECIMAL(9, 6)),
+                FIELD("doubles", ARRAY(DOUBLE())),
+                FIELD("date", DATE()),
+                FIELD("time", TIME(0)),
+                FIELD("timestamp3", TIMESTAMP(3)),
+                FIELD("timestamp9", TIMESTAMP(9)),
+                FIELD("timestampWithLocalZone", 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
+                FIELD("map", MAP(STRING(), BIGINT())),
+                FIELD("multiSet", MULTISET(STRING())),
+                FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))));
+    }
+
+    private ResolvedSchema getTestResolvedSchema(List<DataTypes.Field> fields) 
{
+        List<String> columnNames =
+                
fields.stream().map(DataTypes.AbstractField::getName).collect(Collectors.toList());
+        List<DataType> columnDataTypes =
+                
fields.stream().map(DataTypes.Field::getDataType).collect(Collectors.toList());
+        return ResolvedSchema.physical(columnNames, columnDataTypes);
+    }
+
+    private Row getTestRowDataWithNullValues(Row testRow, List<Integer> 
positions) {
+        for (int position : positions) {
+            testRow.setField(position, null);
+        }
+        return testRow;
+    }
+
     @SuppressWarnings("unchecked")
-    private static Row convertToExternal(RowData rowData, DataType dataType) {
+    private Row convertToExternal(RowData rowData, DataType dataType) {
         return (Row) 
DataFormatConverters.getConverterForDataType(dataType).toExternal(rowData);
     }
 
     @SuppressWarnings("unchecked")
-    private static GenericRowData convertToInternal(Row row) {
+    private GenericRowData convertToInternal(Row row) {
         DataFormatConverters.DataFormatConverter<GenericRowData, Row> 
converter =
                 DataFormatConverters.getConverterForDataType(ROW(getFields()));
         return converter.toInternal(row);
diff --git a/flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt 
b/flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt
new file mode 100644
index 00000000000..42b972a757e
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt
@@ -0,0 +1 @@
+{"columns":[{"name":"bool","logicalType":{"type":"BOOLEAN","nullable":true},"comment":null},{"name":"tinyint","logicalType":{"type":"TINYINT","nullable":true},"comment":null},{"name":"smallint","logicalType":{"type":"SMALLINT","nullable":true},"comment":null},{"name":"int","logicalType":{"type":"INTEGER","nullable":true},"comment":null},{"name":"bigint","logicalType":{"type":"BIGINT","nullable":true},"comment":null},{"name":"float","logicalType":{"type":"FLOAT","nullable":true},"comment"
 [...]
diff --git a/pom.xml b/pom.xml
index 417c8c5028b..ab0a7000527 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1530,6 +1530,7 @@ under the License.
                                                
<exclude>flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/*.java</exclude>
                                                
<exclude>flink-formats/flink-parquet/src/test/resources/avro/**</exclude>
                                                
<exclude>flink-formats/flink-parquet/src/test/resources/protobuf/**</exclude>
+                                               
<exclude>flink-table/flink-sql-gateway/src/test/resources/*.txt</exclude>
                                                <!-- netty test file, still 
Apache License 2.0 but with a different header -->
                                                
<exclude>flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java</exclude>
                                                <!-- Configuration Files. -->

Reply via email to