This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 95cb47ef53c [FLINK-30449][sql-gateway] Introduce ResultInfo to improve the serde of FetchResultsResponseBody (#21523) 95cb47ef53c is described below commit 95cb47ef53c822161ca90cdca43ad95fd1332633 Author: yuzelin <33053040+yuze...@users.noreply.github.com> AuthorDate: Tue Dec 27 09:56:32 2022 +0800 [FLINK-30449][sql-gateway] Introduce ResultInfo to improve the serde of FetchResultsResponseBody (#21523) This closes #21523 --- .../flink/table/gateway/api/results/ResultSet.java | 9 +- .../handler/statement/FetchResultsHandler.java | 4 +- ...nHeaders.java => AbstractOperationHeaders.java} | 2 +- .../header/operation/CancelOperationHeaders.java | 2 +- .../header/operation/CloseOperationHeaders.java | 2 +- .../operation/GetOperationStatusHeaders.java | 2 +- .../header/statement/ExecuteStatementHeaders.java | 2 +- .../operation/OperationHandleIdPathParameter.java | 2 +- .../statement/ExecuteStatementRequestBody.java | 2 +- .../statement/ExecuteStatementResponseBody.java | 2 +- .../statement/FetchResultsResponseBody.java | 20 +- .../statement/FetchResultsTokenParameters.java | 2 +- .../statement/FetchResultsTokenPathParameter.java | 2 +- .../flink/table/gateway/rest/serde/ColumnInfo.java | 14 +- .../rest/serde/JsonResultSetDeserializer.java | 130 ----------- .../rest/serde/JsonResultSetSerializer.java | 127 ----------- .../flink/table/gateway/rest/serde/ResultInfo.java | 111 ++++++++++ .../rest/serde/ResultInfoJsonDeserializer.java | 109 +++++++++ .../rest/serde/ResultInfoJsonSerializer.java | 138 ++++++++++++ .../table/gateway/rest/serde/RowDataInfo.java | 59 ----- .../SqlGatewayRestEndpointStatementITCase.java | 18 +- ...SerDeTest.java => ResultInfoJsonSerDeTest.java} | 246 ++++++++++----------- .../src/test/resources/resultInfo.txt | 1 + pom.xml | 1 + 24 files changed, 531 insertions(+), 476 deletions(-) diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java index c13e7d82fbe..6e397e22193 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java @@ -25,7 +25,6 @@ import org.apache.flink.table.data.RowData; import javax.annotation.Nullable; -import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -33,13 +32,7 @@ import java.util.stream.Collectors; /** The collection of the results. */ @PublicEvolving -public class ResultSet implements Serializable { - - private static final long serialVersionUID = 1L; - - public static final String FIELD_NAME_COLUMN_INFOS = "columns"; - - public static final String FIELD_NAME_DATA = "data"; +public class ResultSet { private final ResultType resultType; diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java index c8945974cef..52b18be0bed 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java @@ -33,6 +33,7 @@ import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathPa import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody; import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters; import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter; +import org.apache.flink.table.gateway.rest.serde.ResultInfo; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; import javax.annotation.Nonnull; @@ -86,6 +87,7 @@ public class FetchResultsHandler nextToken); return CompletableFuture.completedFuture( - new FetchResultsResponseBody(resultSet, resultType, nextResultUri)); + new FetchResultsResponseBody( + ResultInfo.createResultInfo(resultSet), resultType, nextResultUri)); } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstactOperationHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstractOperationHeaders.java similarity index 97% rename from flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstactOperationHeaders.java rename to flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstractOperationHeaders.java index 22d3ac07cb4..b842ff64241 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstactOperationHeaders.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/AbstractOperationHeaders.java @@ -26,7 +26,7 @@ import org.apache.flink.table.gateway.rest.message.operation.OperationStatusResp import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; /** Abstract operation related message headers. */ -public abstract class AbstactOperationHeaders +public abstract class AbstractOperationHeaders implements SqlGatewayMessageHeaders< EmptyRequestBody, OperationStatusResponseBody, OperationMessageParameters> { diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CancelOperationHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CancelOperationHeaders.java index e25a31c6fbe..958778ae9fc 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CancelOperationHeaders.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CancelOperationHeaders.java @@ -23,7 +23,7 @@ import org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPa import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter; /** Message headers for canceling operation. */ -public class CancelOperationHeaders extends AbstactOperationHeaders { +public class CancelOperationHeaders extends AbstractOperationHeaders { private static final CancelOperationHeaders INSTANCE = new CancelOperationHeaders(); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CloseOperationHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CloseOperationHeaders.java index 1b9ae4ea4ec..c7b53888626 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CloseOperationHeaders.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/CloseOperationHeaders.java @@ -23,7 +23,7 @@ import org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPa import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter; /** Message headers for closing operation. */ -public class CloseOperationHeaders extends AbstactOperationHeaders { +public class CloseOperationHeaders extends AbstractOperationHeaders { private static final CloseOperationHeaders INSTANCE = new CloseOperationHeaders(); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/GetOperationStatusHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/GetOperationStatusHeaders.java index f05d1538667..791b9294067 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/GetOperationStatusHeaders.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/operation/GetOperationStatusHeaders.java @@ -23,7 +23,7 @@ import org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPa import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter; /** Message headers for getting the status of operation. */ -public class GetOperationStatusHeaders extends AbstactOperationHeaders { +public class GetOperationStatusHeaders extends AbstractOperationHeaders { private static final GetOperationStatusHeaders INSTANCE = new GetOperationStatusHeaders(); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/ExecuteStatementHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/ExecuteStatementHeaders.java index fb30d3fb83a..5c36745edd9 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/ExecuteStatementHeaders.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/ExecuteStatementHeaders.java @@ -27,7 +27,7 @@ import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRes import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; -/** Message headers for execute a statement. */ +/** Message headers for executing a statement. */ public class ExecuteStatementHeaders implements SqlGatewayMessageHeaders< ExecuteStatementRequestBody, diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/operation/OperationHandleIdPathParameter.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/operation/OperationHandleIdPathParameter.java index 26df625210a..f3e97a66701 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/operation/OperationHandleIdPathParameter.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/operation/OperationHandleIdPathParameter.java @@ -23,7 +23,7 @@ import org.apache.flink.table.gateway.api.operation.OperationHandle; import java.util.UUID; -/** {@link MessagePathParameter} that parse the {@link OperationHandle}. */ +/** {@link MessagePathParameter} that parses the {@link OperationHandle}. */ public class OperationHandleIdPathParameter extends MessagePathParameter<OperationHandle> { public static final String KEY = "operation_handle"; diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementRequestBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementRequestBody.java index 6b0ac225cfa..00e05b18bfe 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementRequestBody.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementRequestBody.java @@ -27,7 +27,7 @@ import javax.annotation.Nullable; import java.util.Map; -/** {@link RequestBody} for execute a statement. */ +/** {@link RequestBody} for executing a statement. */ @JsonInclude(JsonInclude.Include.NON_NULL) public class ExecuteStatementRequestBody implements RequestBody { diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementResponseBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementResponseBody.java index 80faaf5986d..086bc2bf5ce 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementResponseBody.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/ExecuteStatementResponseBody.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -/** {@link ResponseBody} for execute a statement. */ +/** {@link ResponseBody} for executing a statement. */ @JsonInclude(JsonInclude.Include.NON_NULL) public class ExecuteStatementResponseBody implements ResponseBody { diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java index bff14c98b40..ccdaee00731 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java @@ -19,17 +19,18 @@ package org.apache.flink.table.gateway.rest.message.statement; import org.apache.flink.runtime.rest.messages.ResponseBody; -import org.apache.flink.table.gateway.api.results.ResultSet; -import org.apache.flink.table.gateway.rest.serde.JsonResultSetDeserializer; -import org.apache.flink.table.gateway.rest.serde.JsonResultSetSerializer; +import org.apache.flink.table.gateway.rest.serde.ResultInfo; +import org.apache.flink.table.gateway.rest.serde.ResultInfoJsonDeserializer; +import org.apache.flink.table.gateway.rest.serde.ResultInfoJsonSerializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; import javax.annotation.Nullable; -/** {@link ResponseBody} for execute a statement. */ +/** {@link ResponseBody} for executing a statement. */ public class FetchResultsResponseBody implements ResponseBody { private static final String FIELD_RESULT_TYPE = "resultType"; @@ -37,9 +38,9 @@ public class FetchResultsResponseBody implements ResponseBody { private static final String FIELD_NEXT_RESULT_URI = "nextResultUri"; @JsonProperty(FIELD_RESULTS) - @JsonSerialize(using = JsonResultSetSerializer.class) - @JsonDeserialize(using = JsonResultSetDeserializer.class) - private final ResultSet results; + @JsonSerialize(using = ResultInfoJsonSerializer.class) + @JsonDeserialize(using = ResultInfoJsonDeserializer.class) + private final ResultInfo results; @JsonProperty(FIELD_RESULT_TYPE) private final String resultType; @@ -48,8 +49,9 @@ public class FetchResultsResponseBody implements ResponseBody { @Nullable private final String nextResultUri; + @JsonCreator public FetchResultsResponseBody( - @JsonProperty(FIELD_RESULTS) ResultSet results, + @JsonProperty(FIELD_RESULTS) ResultInfo results, @JsonProperty(FIELD_RESULT_TYPE) String resultType, @Nullable @JsonProperty(FIELD_NEXT_RESULT_URI) String nextResultUri) { this.results = results; @@ -57,7 +59,7 @@ public class FetchResultsResponseBody implements ResponseBody { this.nextResultUri = nextResultUri; } - public ResultSet getResults() { + public ResultInfo getResults() { return results; } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java index aa014c44464..132dc631a7b 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java @@ -30,7 +30,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -/** {@link MessagePathParameter} for fetch results. */ +/** {@link MessagePathParameter} for fetching results. */ public class FetchResultsTokenParameters extends MessageParameters { private final SessionHandleIdPathParameter sessionHandleIdPathParameter = diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenPathParameter.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenPathParameter.java index f3cb420585c..c91fe56ba98 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenPathParameter.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenPathParameter.java @@ -20,7 +20,7 @@ package org.apache.flink.table.gateway.rest.message.statement; import org.apache.flink.runtime.rest.messages.MessagePathParameter; -/** {@link MessagePathParameter} that parse the token string. */ +/** {@link MessagePathParameter} that parses the token string. */ public class FetchResultsTokenPathParameter extends MessagePathParameter<Long> { public static final String KEY = "token"; diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ColumnInfo.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ColumnInfo.java index d07218430c0..658bdc1854c 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ColumnInfo.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ColumnInfo.java @@ -19,7 +19,9 @@ package org.apache.flink.table.gateway.rest.serde; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -60,8 +62,16 @@ public class ColumnInfo { this.comment = comment; } - public static ColumnInfo create(String name, LogicalType type, @Nullable String comment) { - return new ColumnInfo(name, type, comment); + public static ColumnInfo toColumnInfo(Column column) { + return new ColumnInfo( + column.getName(), + column.getDataType().getLogicalType(), + column.getComment().orElse(null)); + } + + public Column toColumn() { + return Column.physical(name, DataTypeUtils.toInternalDataType(logicalType)) + .withComment(comment); } public String getName() { diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetDeserializer.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetDeserializer.java deleted file mode 100644 index 77c6c8976ce..00000000000 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetDeserializer.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.gateway.rest.serde; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.formats.common.TimestampFormat; -import org.apache.flink.formats.json.JsonToRowDataConverters; -import org.apache.flink.table.catalog.Column; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.binary.BinaryStringData; -import org.apache.flink.table.gateway.api.results.ResultSet; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.utils.DataTypeUtils; -import org.apache.flink.types.RowKind; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static org.apache.flink.table.gateway.api.results.ResultSet.FIELD_NAME_COLUMN_INFOS; -import static org.apache.flink.table.gateway.api.results.ResultSet.FIELD_NAME_DATA; - -/** Json deserializer for {@link ResultSet}. */ -@Internal -public class JsonResultSetDeserializer extends StdDeserializer<ResultSet> { - - private static final long serialVersionUID = 1L; - - public JsonResultSetDeserializer() { - super(ResultSet.class); - } - - private static final JsonToRowDataConverters TO_ROWDATA_CONVERTERS = - new JsonToRowDataConverters(false, false, TimestampFormat.ISO_8601); - - @Override - public ResultSet deserialize(JsonParser jsonParser, DeserializationContext ctx) - throws IOException { - JsonNode node = jsonParser.getCodec().readTree(jsonParser); - ResolvedSchema resolvedSchema; - List<RowData> data = new ArrayList<>(); - - // Deserialize column infos - ColumnInfo[] columnInfos = - jsonParser - .getCodec() - .treeToValue(node.get(FIELD_NAME_COLUMN_INFOS), ColumnInfo[].class); - List<Column> columns = new ArrayList<>(); - for (ColumnInfo columnInfo : columnInfos) { - LogicalType logicalType = columnInfo.getLogicalType(); - columns.add( - Column.physical( - columnInfo.getName(), DataTypeUtils.toInternalDataType(logicalType))); - } - - // Parse the schema from Column - resolvedSchema = ResolvedSchema.of(columns); - // The fieldType for all RowData - List<LogicalType> fieldTypes = - resolvedSchema.getColumnDataTypes().stream() - .map(DataType::getLogicalType) - .collect(Collectors.toList()); - - // Generate converters for all fieldTypes - List<JsonToRowDataConverters.JsonToRowDataConverter> converters = - fieldTypes.stream() - .map(TO_ROWDATA_CONVERTERS::createConverter) - .collect(Collectors.toList()); - - // Get the RowDataInfo - JsonParser dataParser = node.get(FIELD_NAME_DATA).traverse(); - dataParser.nextToken(); - RowDataInfo[] rowDataInfos = ctx.readValue(dataParser, RowDataInfo[].class); - - // Parse the RowData from RowDataInfo - for (RowDataInfo rowDataInfo : rowDataInfos) { - RowKind rowKind = RowKind.valueOf(rowDataInfo.getKind()); - GenericRowData rowData = new GenericRowData(rowKind, rowDataInfo.getFields().size()); - List<JsonNode> fields = rowDataInfo.getFields(); - // Setting fields of one RowData - for (int i = 0; i < rowData.getArity(); ++i) { - JsonNode jsonNode = fields.get(i); - JsonToRowDataConverters.JsonToRowDataConverter converter = converters.get(i); - Object object = buildObjectValueConverter(converter).apply(jsonNode); - if (object != null && object.toString().equals("null")) { - rowData.setField(i, new BinaryStringData("")); - } else { - rowData.setField(i, object); - } - } - data.add(rowData); - } - - // The placeholder is used to build a ResultSet - ResultSet.ResultType resultTypePlaceHolder = ResultSet.ResultType.PAYLOAD; - Long nextTokenPlaceHolder = 0L; - return new ResultSet(resultTypePlaceHolder, nextTokenPlaceHolder, resolvedSchema, data); - } - - private static Function<JsonNode, Object> buildObjectValueConverter( - JsonToRowDataConverters.JsonToRowDataConverter converter) { - return converter::convert; - } -} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerializer.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerializer.java deleted file mode 100644 index 3bdb1687058..00000000000 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerializer.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.gateway.rest.serde; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.formats.common.TimestampFormat; -import org.apache.flink.formats.json.JsonFormatOptions; -import org.apache.flink.formats.json.RowDataToJsonConverters; -import org.apache.flink.table.catalog.Column; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.gateway.api.results.ResultSet; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.types.RowKind; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static org.apache.flink.table.gateway.api.results.ResultSet.FIELD_NAME_COLUMN_INFOS; -import static org.apache.flink.table.gateway.api.results.ResultSet.FIELD_NAME_DATA; - -/** Json serializer for {@link ResultSet}. */ -@Internal -public class JsonResultSetSerializer extends StdSerializer<ResultSet> { - - private static final long serialVersionUID = 1L; - - public JsonResultSetSerializer() { - super(ResultSet.class); - } - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final RowDataToJsonConverters TO_JSON_CONVERTERS = - new RowDataToJsonConverters( - TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null"); - - @Override - public void serialize( - ResultSet resultSet, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) - throws IOException { - jsonGenerator.writeStartObject(); - - // 1.Serialize ColumnInfo - List<Column> columns = resultSet.getResultSchema().getColumns(); - List<ColumnInfo> columnInfos = new ArrayList<>(); - for (Column column : columns) { - columnInfos.add( - new ColumnInfo( - column.getName(), - column.getDataType().getLogicalType(), - column.getComment().orElse(null))); - } - serializerProvider.defaultSerializeField( - FIELD_NAME_COLUMN_INFOS, columnInfos, jsonGenerator); - - // 2.Serialize RowData - - // The fieldGetters for all RowData - List<RowData.FieldGetter> fieldGetters = new ArrayList<>(); - for (int i = 0; i < resultSet.getResultSchema().getColumnCount(); i++) { - fieldGetters.add( - RowData.createFieldGetter( - resultSet - .getResultSchema() - .getColumnDataTypes() - .get(i) - .getLogicalType(), - i)); - } - - // The fieldType for all RowData - List<LogicalType> fieldTypes = - resultSet.getResultSchema().getColumnDataTypes().stream() - .map(DataType::getLogicalType) - .collect(Collectors.toList()); - - // Generate converters for all fieldTypes - List<RowDataToJsonConverters.RowDataToJsonConverter> converters = - fieldTypes.stream() - .map(TO_JSON_CONVERTERS::createConverter) - .collect(Collectors.toList()); - List<RowDataInfo> data = new ArrayList<>(); - for (RowData row : resultSet.getData()) { - RowKind rowKind = row.getRowKind(); - List<JsonNode> fields = new ArrayList<>(); - for (int i = 0; i < row.getArity(); ++i) { - Object field = fieldGetters.get(i).getFieldOrNull(row); - RowDataToJsonConverters.RowDataToJsonConverter converter = converters.get(i); - fields.add(buildJsonValueConverter(converter).apply(field)); - } - data.add(new RowDataInfo(rowKind.name(), fields)); - } - - serializerProvider.defaultSerializeField(FIELD_NAME_DATA, data, jsonGenerator); - jsonGenerator.writeEndObject(); - } - - private static Function<Object, JsonNode> buildJsonValueConverter( - RowDataToJsonConverters.RowDataToJsonConverter converter) { - return field -> converter.convert(OBJECT_MAPPER, null, field); - } -} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java new file mode 100644 index 00000000000..c7ec5b05ff9 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.serde; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.types.logical.LogicalType; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * A {@code ResultInfo} contains information of a {@link ResultSet}. It is designed for transferring + * the information of ResultSet via REST. For its serialization and deserialization, See: + * + * <p>{@link ResultInfoJsonSerializer} and {@link ResultInfoJsonDeserializer} + */ +@Internal +public class ResultInfo { + + // Columns + public static final String FIELD_NAME_COLUMN_INFOS = "columns"; + + // RowData + public static final String FIELD_NAME_DATA = "data"; + public static final String FIELD_NAME_KIND = "kind"; + public static final String FIELD_NAME_FIELDS = "fields"; + + private final List<ColumnInfo> columnInfos; + private final List<RowData> data; + + public ResultInfo(List<ColumnInfo> columnInfos, List<RowData> data) { + this.columnInfos = columnInfos; + this.data = data; + } + + public static ResultInfo createResultInfo(ResultSet resultSet) { + return new ResultInfo( + resultSet.getResultSchema().getColumns().stream() + .map(ColumnInfo::toColumnInfo) + .collect(Collectors.toList()), + resultSet.getData()); + } + + public List<ColumnInfo> getColumnInfos() { + return Collections.unmodifiableList(columnInfos); + } + + public List<RowData> getData() { + return data; + } + + public List<RowData.FieldGetter> getFieldGetters() { + List<LogicalType> columnTypes = + columnInfos.stream().map(ColumnInfo::getLogicalType).collect(Collectors.toList()); + return IntStream.range(0, columnTypes.size()) + .mapToObj(i -> RowData.createFieldGetter(columnTypes.get(i), i)) + .collect(Collectors.toList()); + } + + public ResolvedSchema getResultSchema() { + return ResolvedSchema.of( + columnInfos.stream().map(ColumnInfo::toColumn).collect(Collectors.toList())); + } + + @Override + public int hashCode() { + return Objects.hash(columnInfos, data); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ResultInfo)) { + return false; + } + ResultInfo that = (ResultInfo) o; + return Objects.equals(columnInfos, that.columnInfos) && Objects.equals(data, that.data); + } + + @Override + public String toString() { + return String.format( + "ResultInfo{\n columnInfos=[%s],\n rows=[%s]\n}", + columnInfos.stream().map(Object::toString).collect(Collectors.joining(",")), + data.stream().map(Object::toString).collect(Collectors.joining(","))); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java new file mode 100644 index 00000000000..c842ebc6051 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.serde; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonToRowDataConverters; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CollectionUtil; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_COLUMN_INFOS; +import static org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_DATA; +import static org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_FIELDS; +import static org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_KIND; + +/** + * Json deserializer for {@link ResultInfo}. + * + * @see ResultInfoJsonSerializer for the reverse operation. + */ +@Internal +public class ResultInfoJsonDeserializer extends StdDeserializer<ResultInfo> { + + private static final long serialVersionUID = 1L; + + public ResultInfoJsonDeserializer() { + super(ResultInfo.class); + } + + private static final JsonToRowDataConverters TO_ROW_DATA_CONVERTERS = + new JsonToRowDataConverters(false, false, TimestampFormat.ISO_8601); + + @Override + public ResultInfo deserialize(JsonParser jsonParser, DeserializationContext ctx) + throws IOException { + JsonNode node = jsonParser.getCodec().readTree(jsonParser); + + // deserialize ColumnInfos + List<ColumnInfo> columnInfos = + Arrays.asList( + jsonParser + .getCodec() + .treeToValue( + node.get(FIELD_NAME_COLUMN_INFOS), ColumnInfo[].class)); + + // generate converters for all fields of each row + List<JsonToRowDataConverters.JsonToRowDataConverter> converters = + columnInfos.stream() + .map(ColumnInfo::getLogicalType) + .map(TO_ROW_DATA_CONVERTERS::createConverter) + .collect(Collectors.toList()); + + // deserialize rows + List<RowData> data = deserializeData((ArrayNode) node.get(FIELD_NAME_DATA), converters); + + return new ResultInfo(columnInfos, data); + } + + private List<RowData> deserializeData( + ArrayNode serializedRows, + List<JsonToRowDataConverters.JsonToRowDataConverter> converters) { + List<RowData> data = new ArrayList<>(); + serializedRows.forEach(rowDataNode -> data.add(convertToRowData(rowDataNode, converters))); + return data; + } + + private GenericRowData convertToRowData( + JsonNode serializedRow, + List<JsonToRowDataConverters.JsonToRowDataConverter> converters) { + ArrayNode fieldsArrayNode = (ArrayNode) serializedRow.get(FIELD_NAME_FIELDS); + List<JsonNode> fieldNodes = CollectionUtil.iteratorToList(fieldsArrayNode.iterator()); + return GenericRowData.ofKind( + RowKind.valueOf(serializedRow.get(FIELD_NAME_KIND).asText()), + IntStream.range(0, fieldNodes.size()) + .mapToObj(i -> converters.get(i).convert(fieldNodes.get(i))) + .toArray()); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java new file mode 100644 index 00000000000..762e634b39f --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.serde; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonFormatOptions; +import org.apache.flink.formats.json.RowDataToJsonConverters; +import org.apache.flink.table.data.RowData; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_COLUMN_INFOS; +import static org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_DATA; +import static org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_FIELDS; +import static org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_KIND; + +/** + * Json serializer for {@link ResultInfo}. + * + * @see ResultInfoJsonDeserializer for the reverse operation. + */ +@Internal +public class ResultInfoJsonSerializer extends StdSerializer<ResultInfo> { + + private static final long serialVersionUID = 1L; + + public ResultInfoJsonSerializer() { + super(ResultInfo.class); + } + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final RowDataToJsonConverters TO_JSON_CONVERTERS = + new RowDataToJsonConverters( + TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, ""); + + @Override + public void serialize( + ResultInfo resultInfo, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider) + throws IOException { + jsonGenerator.writeStartObject(); + + // serialize ColumnInfos + serializerProvider.defaultSerializeField( + FIELD_NAME_COLUMN_INFOS, resultInfo.getColumnInfos(), jsonGenerator); + + // serialize data + serializeData(resultInfo.getData(), buildToJsonConverters(resultInfo), jsonGenerator); + + jsonGenerator.writeEndObject(); + } + + private void serializeData( + List<RowData> data, + List<Function<RowData, JsonNode>> converters, + JsonGenerator jsonGenerator) + throws IOException { + // format: + // data: [{"kind": "", "fields": []}, ...] + ArrayNode serializedData = OBJECT_MAPPER.createArrayNode(); + serializedData.addAll( + data.stream() + .map(rowData -> convertRowData(rowData, converters)) + .collect(Collectors.toList())); + jsonGenerator.writeFieldName(FIELD_NAME_DATA); + jsonGenerator.writeTree(serializedData); + } + + private JsonNode convertRowData(RowData rowData, List<Function<RowData, JsonNode>> converters) { + ObjectNode serializedRowData = OBJECT_MAPPER.createObjectNode(); + // kind + serializedRowData.put(FIELD_NAME_KIND, rowData.getRowKind().name()); + // fields + ArrayNode fields = serializedRowData.putArray(FIELD_NAME_FIELDS); + fields.addAll( + converters.stream() + .map(converter -> converter.apply(rowData)) + .collect(Collectors.toList())); + + return serializedRowData; + } + + /** Composes the FieldGetter and RowDataToJsonConverter. */ + private List<Function<RowData, JsonNode>> buildToJsonConverters(ResultInfo resultInfo) { + List<RowDataToJsonConverters.RowDataToJsonConverter> converters = + resultInfo.getColumnInfos().stream() + .map(ColumnInfo::getLogicalType) + .map(TO_JSON_CONVERTERS::createConverter) + .collect(Collectors.toList()); + + List<RowData.FieldGetter> fieldGetters = resultInfo.getFieldGetters(); + + return IntStream.range(0, converters.size()) + .mapToObj( + i -> + (Function<RowData, JsonNode>) + rowData -> + converters + .get(i) + .convert( + OBJECT_MAPPER, + null, + fieldGetters + .get(i) + .getFieldOrNull(rowData))) + .collect(Collectors.toList()); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/RowDataInfo.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/RowDataInfo.java deleted file mode 100644 index b041a5c6dd2..00000000000 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/RowDataInfo.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.gateway.rest.serde; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.table.data.RowData; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; - -import java.util.List; - -/** A RowDataInfo info represents a {@link RowData}. */ -@Internal -public class RowDataInfo { - - private static final String FIELD_NAME_KIND = "kind"; - private static final String FIELD_NAME_FIELDS = "fields"; - - @JsonProperty(FIELD_NAME_KIND) - private final String kind; - - @JsonProperty(FIELD_NAME_FIELDS) - private final List<JsonNode> fields; - - @JsonCreator - public RowDataInfo( - @JsonProperty(FIELD_NAME_KIND) String kind, - @JsonProperty(FIELD_NAME_FIELDS) List<JsonNode> fields) { - this.kind = Preconditions.checkNotNull(kind, "kind must not be null"); - this.fields = Preconditions.checkNotNull(fields, "fields must not be null"); - } - - public String getKind() { - return kind; - } - - public List<JsonNode> getFields() { - return fields; - } -} diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java index ebf721e930e..019bcdd8abd 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase; import org.apache.flink.table.gateway.api.operation.OperationHandle; @@ -40,6 +41,7 @@ import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementReq import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody; import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody; import org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters; +import org.apache.flink.table.gateway.rest.serde.ResultInfo; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; import org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl; import org.apache.flink.table.utils.DateTimeUtils; @@ -139,20 +141,23 @@ class SqlGatewayRestEndpointStatementITCase extends AbstractSqlGatewayStatementI FetchResultsResponseBody fetchResultsResponseBody = fetchResults(sessionHandle, operationHandle, 0L); - ResultSet resultSet = fetchResultsResponseBody.getResults(); + ResultInfo resultInfo = fetchResultsResponseBody.getResults(); + assertThat(resultInfo).isNotNull(); + String resultType = fetchResultsResponseBody.getResultType(); - assertThat(resultSet).isNotNull(); assertThat( Arrays.asList( ResultSet.ResultType.PAYLOAD.name(), ResultSet.ResultType.EOS.name())) .contains(resultType); + ResolvedSchema resultSchema = resultInfo.getResultSchema(); + return toString( StatementType.match(statement), - resultSet.getResultSchema(), + resultSchema, new RowDataToStringConverterImpl( - resultSet.getResultSchema().toPhysicalRowDataType(), + resultSchema.toPhysicalRowDataType(), DateTimeUtils.UTC_ZONE.toZoneId(), SqlGatewayRestEndpointStatementITCase.class.getClassLoader(), false), @@ -234,10 +239,11 @@ class SqlGatewayRestEndpointStatementITCase extends AbstractSqlGatewayStatementI private void fetch() throws Exception { FetchResultsResponseBody fetchResultsResponseBody = fetchResults(sessionHandle, operationHandle, token); + String nextResultUri = fetchResultsResponseBody.getNextResultUri(); - ResultSet resultSet = fetchResultsResponseBody.getResults(); token = parseTokenFromUri(nextResultUri); - fetchedRows = resultSet.getData().iterator(); + + fetchedRows = fetchResultsResponseBody.getResults().getData().iterator(); } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerDeTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java similarity index 58% rename from flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerDeTest.java rename to flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java index eee260f6093..3541e75cdad 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/JsonResultSetSerDeTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java @@ -27,14 +27,19 @@ import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.io.IOException; import java.math.BigDecimal; +import java.net.URL; +import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; @@ -72,149 +77,108 @@ import static org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZON import static org.apache.flink.table.api.DataTypes.TINYINT; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link JsonResultSetSerializer} and {@link JsonResultSetDeserializer}. */ -class JsonResultSetSerDeTest { +/** Tests for {@link ResultInfoJsonSerializer} and {@link ResultInfoJsonDeserializer}. */ +public class ResultInfoJsonSerDeTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Row testRow = initRow(); - private static final byte tinyint = 'c'; - private static final short smallint = 128; - private static final int intValue = 45536; - private static final float floatValue = 33.333F; - private static final long bigint = 1238123899121L; - private static final String name = "asdlkjasjkdla998y1122"; - private static final byte[] bytes = new byte[1024]; - private static final Double[] doubles = new Double[] {1.1, 2.2, 3.3}; - private static final BigDecimal decimal = new BigDecimal("123.456789"); - private static final LocalDate date = LocalDate.parse("1990-10-14"); - private static final LocalTime time = LocalTime.parse("12:12:43"); - private static final Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123"); - private static final Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 12:12:43.123456789"); - private static final Instant timestampWithLocalZone = - LocalDateTime.of(1990, 10, 14, 12, 12, 43, 123456789) - .atOffset(ZoneOffset.of("Z")) - .toInstant(); - private static final int ROW_NUMBER = 10; - - private static final Map<String, Long> map = new HashMap<>(); - private static final Map<String, Integer> multiSet = new HashMap<>(); - private static final Map<String, Map<String, Integer>> nestedMap = new HashMap<>(); - private static final Map<String, Integer> innerMap = new HashMap<>(); - - static { - map.put("element", 123L); - multiSet.put("element", 2); - innerMap.put("key", 234); - nestedMap.put("inner_map", innerMap); - ThreadLocalRandom.current().nextBytes(bytes); + @BeforeAll + public static void setUp() { + SimpleModule simpleModule = new SimpleModule(); + simpleModule.addSerializer(ResultInfo.class, new ResultInfoJsonSerializer()); + simpleModule.addDeserializer(ResultInfo.class, new ResultInfoJsonDeserializer()); + OBJECT_MAPPER.registerModule(simpleModule); } @Test - void testSerDeResultSetWithSingleRowData() throws Exception { - Row row = getTestRowData(); - seDeResultSet(Collections.singletonList(row), getFields()); + public void testResultInfoSerDeWithSingleRow() throws Exception { + serDeTest(Collections.singletonList(testRow)); } @Test - void testSerDeResultSetWithMultiRowData() throws Exception { - ArrayList<Row> rowList = new ArrayList<>(); - for (int i = 0; i < ROW_NUMBER; ++i) { - rowList.add(getTestRowData()); + public void testResultInfoSerDeWithMultiRowData() throws Exception { + List<Row> rows = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + rows.add(testRow); } - seDeResultSet(rowList, getFields()); + serDeTest(rows); } @Test - void testSerDeResultSetWithRowDataNullValues() throws Exception { - List<Row> rowList = new ArrayList<>(); + public void testResultInfoSerDeWithNullValues() throws Exception { + List<Row> rows = new ArrayList<>(); List<Integer> positions = new ArrayList<>(); - for (int i = 0; i < 18; ++i) { + for (int i = 0; i < 18; i++) { positions.add(new Random().nextInt(18)); } - for (int i = 0; i < ROW_NUMBER; ++i) { - rowList.add(getTestRowDataWithNullValues(positions)); + for (int i = 0; i < 10; i++) { + rows.add(getTestRowDataWithNullValues(initRow(), positions)); } - seDeResultSet(rowList, getFields()); + serDeTest(rows); } - void seDeResultSet(List<Row> rowList, List<DataTypes.Field> fields) throws IOException { + @Test + public void testDeserializationFromJson() throws Exception { + URL url = ResultInfoJsonSerDeTest.class.getClassLoader().getResource("resultInfo.txt"); + String input = + IOUtils.toString(Preconditions.checkNotNull(url), StandardCharsets.UTF_8).trim(); + ResultInfo deserializedResult = OBJECT_MAPPER.readValue(input, ResultInfo.class); + assertThat(OBJECT_MAPPER.writeValueAsString(deserializedResult)).isEqualTo(input); + } + + private void serDeTest(List<Row> rows) throws IOException { List<RowData> rowDataList = - rowList.stream() - .map(JsonResultSetSerDeTest::convertToInternal) - .collect(Collectors.toList()); - ResolvedSchema testResolvedSchema = getTestResolvedSchema(fields); - ResultSet testResultSet = - new ResultSet(ResultSet.ResultType.PAYLOAD, 0L, testResolvedSchema, rowDataList); - // Test serialization & deserialization - ObjectMapper objectMapper = new ObjectMapper(); - SimpleModule resultSetModule = new SimpleModule(); - resultSetModule.addSerializer(ResultSet.class, new JsonResultSetSerializer()); - resultSetModule.addDeserializer(ResultSet.class, new JsonResultSetDeserializer()); - objectMapper.registerModule(resultSetModule); - String result = objectMapper.writeValueAsString(testResultSet); - ResultSet resultSet = objectMapper.readValue(result, ResultSet.class); - List<RowData> deRowDataList = resultSet.getData(); - for (int i = 0; i < deRowDataList.size(); ++i) { - assertThat(convertToExternal(deRowDataList.get(i), ROW(getFields()))) - .isEqualTo(rowList.get(i)); + rows.stream().map(this::convertToInternal).collect(Collectors.toList()); + ResolvedSchema testResolvedSchema = getTestResolvedSchema(getFields()); + ResultInfo testResultInfo = + ResultInfo.createResultInfo( + new ResultSet( + ResultSet.ResultType.PAYLOAD, 0L, testResolvedSchema, rowDataList)); + + // test serialization & deserialization + String result = OBJECT_MAPPER.writeValueAsString(testResultInfo); + ResultInfo resultInfo = OBJECT_MAPPER.readValue(result, ResultInfo.class); + + assertThat(resultInfo.getResultSchema().toString()) + .isEqualTo(testResultInfo.getResultSchema().toString()); + + List<RowData> data = resultInfo.getData(); + for (int i = 0; i < data.size(); i++) { + assertThat(convertToExternal(data.get(i), ROW(getFields()))).isEqualTo(rows.get(i)); } - assertThat(resultSet.getResultSchema().toString()) - .isEqualTo(testResultSet.getResultSchema().toString()); } - private static ResolvedSchema getTestResolvedSchema(List<DataTypes.Field> fields) { - List<String> columnNames = - fields.stream().map(DataTypes.AbstractField::getName).collect(Collectors.toList()); - List<DataType> columnDataTypes = - fields.stream().map(DataTypes.Field::getDataType).collect(Collectors.toList()); - return ResolvedSchema.physical(columnNames, columnDataTypes); - } + private static Row initRow() { + final byte tinyint = 'c'; + final short smallint = 128; + final int intValue = 45536; + final float floatValue = 33.333F; + final long bigint = 1238123899121L; + final String name = "asdlkjasjkdla998y1122"; + final byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + final Double[] doubles = new Double[] {1.1, 2.2, 3.3}; + final BigDecimal decimal = new BigDecimal("123.456789"); + final LocalDate date = LocalDate.parse("1990-10-14"); + final LocalTime time = LocalTime.parse("12:12:43"); + final Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123"); + final Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 12:12:43.123456789"); + final Instant timestampWithLocalZone = + LocalDateTime.of(1990, 10, 14, 12, 12, 43, 123456789) + .atOffset(ZoneOffset.of("Z")) + .toInstant(); - private static List<DataTypes.Field> getFields() { - return Arrays.asList( - FIELD("bool", BOOLEAN()), - FIELD("tinyint", TINYINT()), - FIELD("smallint", SMALLINT()), - FIELD("int", INT()), - FIELD("bigint", BIGINT()), - FIELD("float", FLOAT()), - FIELD("name", STRING()), - FIELD("bytes", BYTES()), - FIELD("decimal", DECIMAL(9, 6)), - FIELD("doubles", ARRAY(DOUBLE())), - FIELD("date", DATE()), - FIELD("time", TIME(0)), - FIELD("timestamp3", TIMESTAMP(3)), - FIELD("timestamp9", TIMESTAMP(9)), - FIELD("timestampWithLocalZone", TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)), - FIELD("map", MAP(STRING(), BIGINT())), - FIELD("multiSet", MULTISET(STRING())), - FIELD("map2map", MAP(STRING(), MAP(STRING(), INT())))); - } + final Map<String, Long> map = new HashMap<>(); + map.put("element", 123L); - private static Row getTestRowData() { - Row testRow = new Row(18); - setRandomKind(testRow); - testRow.setField(0, true); - testRow.setField(1, tinyint); - testRow.setField(2, smallint); - testRow.setField(3, intValue); - testRow.setField(4, bigint); - testRow.setField(5, floatValue); - testRow.setField(6, name); - testRow.setField(7, bytes); - testRow.setField(8, decimal); - testRow.setField(9, doubles); - testRow.setField(10, date); - testRow.setField(11, time); - testRow.setField(12, timestamp3.toLocalDateTime()); - testRow.setField(13, timestamp9.toLocalDateTime()); - testRow.setField(14, timestampWithLocalZone); - testRow.setField(15, map); - testRow.setField(16, multiSet); - testRow.setField(17, nestedMap); - return testRow; - } + final Map<String, Integer> multiSet = new HashMap<>(); + multiSet.put("element", 2); + + final Map<String, Map<String, Integer>> nestedMap = new HashMap<>(); + Map<String, Integer> innerMap = new HashMap<>(); + innerMap.put("key", 234); + nestedMap.put("inner_map", innerMap); - private static Row getTestRowDataWithNullValues(List<Integer> positions) { Row testRow = new Row(18); setRandomKind(testRow); testRow.setField(0, true); @@ -235,9 +199,6 @@ class JsonResultSetSerDeTest { testRow.setField(15, map); testRow.setField(16, multiSet); testRow.setField(17, nestedMap); - for (int position : positions) { - testRow.setField(position, null); - } return testRow; } @@ -259,13 +220,50 @@ class JsonResultSetSerDeTest { } } + private List<DataTypes.Field> getFields() { + return Arrays.asList( + FIELD("bool", BOOLEAN()), + FIELD("tinyint", TINYINT()), + FIELD("smallint", SMALLINT()), + FIELD("int", INT()), + FIELD("bigint", BIGINT()), + FIELD("float", FLOAT()), + FIELD("name", STRING()), + FIELD("bytes", BYTES()), + FIELD("decimal", DECIMAL(9, 6)), + FIELD("doubles", ARRAY(DOUBLE())), + FIELD("date", DATE()), + FIELD("time", TIME(0)), + FIELD("timestamp3", TIMESTAMP(3)), + FIELD("timestamp9", TIMESTAMP(9)), + FIELD("timestampWithLocalZone", TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)), + FIELD("map", MAP(STRING(), BIGINT())), + FIELD("multiSet", MULTISET(STRING())), + FIELD("map2map", MAP(STRING(), MAP(STRING(), INT())))); + } + + private ResolvedSchema getTestResolvedSchema(List<DataTypes.Field> fields) { + List<String> columnNames = + fields.stream().map(DataTypes.AbstractField::getName).collect(Collectors.toList()); + List<DataType> columnDataTypes = + fields.stream().map(DataTypes.Field::getDataType).collect(Collectors.toList()); + return ResolvedSchema.physical(columnNames, columnDataTypes); + } + + private Row getTestRowDataWithNullValues(Row testRow, List<Integer> positions) { + for (int position : positions) { + testRow.setField(position, null); + } + return testRow; + } + @SuppressWarnings("unchecked") - private static Row convertToExternal(RowData rowData, DataType dataType) { + private Row convertToExternal(RowData rowData, DataType dataType) { return (Row) DataFormatConverters.getConverterForDataType(dataType).toExternal(rowData); } @SuppressWarnings("unchecked") - private static GenericRowData convertToInternal(Row row) { + private GenericRowData convertToInternal(Row row) { DataFormatConverters.DataFormatConverter<GenericRowData, Row> converter = DataFormatConverters.getConverterForDataType(ROW(getFields())); return converter.toInternal(row); diff --git a/flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt b/flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt new file mode 100644 index 00000000000..42b972a757e --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt @@ -0,0 +1 @@ +{"columns":[{"name":"bool","logicalType":{"type":"BOOLEAN","nullable":true},"comment":null},{"name":"tinyint","logicalType":{"type":"TINYINT","nullable":true},"comment":null},{"name":"smallint","logicalType":{"type":"SMALLINT","nullable":true},"comment":null},{"name":"int","logicalType":{"type":"INTEGER","nullable":true},"comment":null},{"name":"bigint","logicalType":{"type":"BIGINT","nullable":true},"comment":null},{"name":"float","logicalType":{"type":"FLOAT","nullable":true},"comment" [...] diff --git a/pom.xml b/pom.xml index 417c8c5028b..ab0a7000527 100644 --- a/pom.xml +++ b/pom.xml @@ -1530,6 +1530,7 @@ under the License. <exclude>flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/*.java</exclude> <exclude>flink-formats/flink-parquet/src/test/resources/avro/**</exclude> <exclude>flink-formats/flink-parquet/src/test/resources/protobuf/**</exclude> + <exclude>flink-table/flink-sql-gateway/src/test/resources/*.txt</exclude> <!-- netty test file, still Apache License 2.0 but with a different header --> <exclude>flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java</exclude> <!-- Configuration Files. -->