This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 75992f495192b84cfe406d7e24a1188c7cb284b0 Author: Timo Walther <twal...@apache.org> AuthorDate: Wed Jan 12 14:34:43 2022 +0100 [FLINK-25230][table-planner] Replace RelDataType with LogicalType serialization --- .../exec/serde/AggregateCallJsonSerializer.java | 2 +- .../exec/serde/RelDataTypeJsonDeserializer.java | 173 +----- .../exec/serde/RelDataTypeJsonSerializer.java | 146 +---- .../nodes/exec/serde/RexNodeJsonSerializer.java | 74 ++- .../exec/serde/RexWindowBoundJsonSerializer.java | 3 +- .../planner/plan/schema/StructuredRelDataType.java | 2 +- .../typeutils/LogicalRelDataTypeConverter.java | 649 +++++++++++++++++++++ .../nodes/exec/serde/DataTypeJsonSerdeTest.java | 51 +- .../serde/DynamicTableSourceSpecSerdeTest.java | 3 + .../plan/nodes/exec/serde/JsonSerdeMocks.java | 76 +++ .../nodes/exec/serde/LogicalTypeJsonSerdeTest.java | 45 +- .../nodes/exec/serde/RelDataTypeJsonSerdeTest.java | 219 +++---- .../plan/nodes/exec/serde/RexNodeSerdeTest.java | 103 +--- .../nodes/exec/serde/RexWindowBoundSerdeTest.java | 3 +- .../serde/TemporalTableSourceSpecSerdeTest.java | 8 +- .../typeutils/LogicalRelDataTypeConverterTest.java | 215 +++++++ 16 files changed, 1157 insertions(+), 615 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java index 92c85b4..27f2549 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java @@ -83,7 +83,7 @@ public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> { jsonGenerator.writeBooleanField(FIELD_NAME_DISTINCT, aggCall.isDistinct()); jsonGenerator.writeBooleanField(FIELD_NAME_APPROXIMATE, aggCall.isApproximate()); jsonGenerator.writeBooleanField(FIELD_NAME_IGNORE_NULLS, aggCall.ignoreNulls()); - jsonGenerator.writeObjectField(FIELD_NAME_TYPE, aggCall.getType()); + serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, aggCall.getType(), jsonGenerator); jsonGenerator.writeEndObject(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java index 1476e41..6b35780 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java @@ -18,57 +18,26 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.api.TableException; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter; import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.RawType; -import org.apache.flink.table.types.logical.StructuredType; -import org.apache.flink.table.types.logical.TimestampKind; -import org.apache.flink.table.types.logical.TypeInformationRawType; -import org.apache.flink.table.types.logical.utils.LogicalTypeParser; -import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode; -import org.apache.calcite.avatica.util.TimeUnit; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.StructKind; -import org.apache.calcite.sql.SqlIntervalQualifier; -import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.util.Util; import java.io.IOException; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_ELEMENT; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_FIELDS; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_FILED_NAME; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_KEY; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_NULLABLE; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_PRECISION; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_RAW_TYPE; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_SCALE; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_STRUCTURED_TYPE; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_STRUCT_KIND; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_TIMESTAMP_KIND; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_TYPE_INFO; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_TYPE_NAME; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_VALUE; -import static org.apache.flink.util.Preconditions.checkArgument; - /** - * JSON deserializer for {@link RelDataType}. refer to {@link RelDataTypeJsonSerializer} for - * serializer. + * JSON deserializer for {@link RelDataType}. + * + * @see RelDataTypeJsonSerializer for the reverse operation */ +@Internal public class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> { private static final long serialVersionUID = 1L; @@ -79,129 +48,11 @@ public class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> { @Override public RelDataType deserialize(JsonParser jsonParser, DeserializationContext ctx) throws IOException { - JsonNode jsonNode = jsonParser.readValueAsTree(); - return deserialize(jsonNode, jsonParser.getCodec(), ctx); - } - - private RelDataType deserialize( - JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) throws IOException { - SerdeContext serdeContext = SerdeContext.get(ctx); - FlinkTypeFactory typeFactory = serdeContext.getTypeFactory(); - if (jsonNode instanceof ObjectNode) { - ObjectNode objectNode = (ObjectNode) jsonNode; - if (objectNode.has(FIELD_NAME_TIMESTAMP_KIND)) { - boolean nullable = objectNode.get(FIELD_NAME_NULLABLE).booleanValue(); - String typeName = objectNode.get(FIELD_NAME_TYPE_NAME).textValue(); - boolean isTimestampLtz = - SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.name().equals(typeName); - TimestampKind timestampKind = - TimestampKind.valueOf( - objectNode.get(FIELD_NAME_TIMESTAMP_KIND).asText().toUpperCase()); - switch (timestampKind) { - case ROWTIME: - return typeFactory.createRowtimeIndicatorType(nullable, isTimestampLtz); - case PROCTIME: - return typeFactory.createProctimeIndicatorType(nullable); - default: - throw new TableException(timestampKind + " is not supported."); - } - } else if (objectNode.has(FIELD_NAME_STRUCTURED_TYPE)) { - JsonNode structuredTypeNode = objectNode.get(FIELD_NAME_STRUCTURED_TYPE); - LogicalType structuredType = - ctx.readValue(structuredTypeNode.traverse(codec), LogicalType.class); - checkArgument(structuredType instanceof StructuredType); - return serdeContext.getTypeFactory().createFieldTypeFromLogicalType(structuredType); - } else if (objectNode.has(FIELD_NAME_STRUCT_KIND)) { - ArrayNode arrayNode = (ArrayNode) objectNode.get(FIELD_NAME_FIELDS); - RelDataTypeFactory.Builder builder = typeFactory.builder(); - for (JsonNode node : arrayNode) { - builder.add( - node.get(FIELD_NAME_FILED_NAME).asText(), - deserialize(node, codec, ctx)); - } - StructKind structKind = - StructKind.valueOf( - objectNode.get(FIELD_NAME_STRUCT_KIND).asText().toUpperCase()); - boolean nullable = objectNode.get(FIELD_NAME_NULLABLE).booleanValue(); - return builder.kind(structKind).nullableRecord(nullable).build(); - } else if (objectNode.has(FIELD_NAME_FIELDS)) { - JsonNode fields = objectNode.get(FIELD_NAME_FIELDS); - // Nested struct - return deserialize(fields, codec, ctx); - } else { - SqlTypeName sqlTypeName = - Util.enumVal( - SqlTypeName.class, objectNode.get(FIELD_NAME_TYPE_NAME).asText()); - boolean nullable = objectNode.get(FIELD_NAME_NULLABLE).booleanValue(); - if (SqlTypeName.INTERVAL_TYPES.contains(sqlTypeName)) { - TimeUnit startUnit = sqlTypeName.getStartUnit(); - TimeUnit endUnit = sqlTypeName.getEndUnit(); - return typeFactory.createTypeWithNullability( - typeFactory.createSqlIntervalType( - new SqlIntervalQualifier( - startUnit, endUnit, SqlParserPos.ZERO)), - nullable); - } - if (sqlTypeName == SqlTypeName.OTHER && objectNode.has(FIELD_NAME_RAW_TYPE)) { - RawType<?> rawType = - (RawType<?>) - LogicalTypeParser.parse( - objectNode.get(FIELD_NAME_RAW_TYPE).asText(), - serdeContext.getClassLoader()); - return typeFactory.createTypeWithNullability( - typeFactory.createFieldTypeFromLogicalType(rawType), nullable); - } - if (sqlTypeName == SqlTypeName.ANY && objectNode.has(FIELD_NAME_RAW_TYPE)) { - JsonNode rawTypeNode = objectNode.get(FIELD_NAME_RAW_TYPE); - boolean nullableOfTypeInfo = - rawTypeNode.get(FIELD_NAME_NULLABLE).booleanValue(); - TypeInformation<?> typeInfo = - EncodingUtils.decodeStringToObject( - rawTypeNode.get(FIELD_NAME_TYPE_INFO).asText(), - TypeInformation.class, - serdeContext.getClassLoader()); - TypeInformationRawType<?> rawType = - new TypeInformationRawType<>(nullableOfTypeInfo, typeInfo); - return typeFactory.createTypeWithNullability( - typeFactory.createFieldTypeFromLogicalType(rawType), nullable); - } - - Integer precision = - objectNode.has(FIELD_NAME_PRECISION) - ? objectNode.get(FIELD_NAME_PRECISION).intValue() - : null; - Integer scale = - objectNode.has(FIELD_NAME_SCALE) - ? objectNode.get(FIELD_NAME_SCALE).intValue() - : null; - final RelDataType type; - if (sqlTypeName == SqlTypeName.ARRAY) { - RelDataType elementType = - deserialize(objectNode.get(FIELD_NAME_ELEMENT), codec, ctx); - type = typeFactory.createArrayType(elementType, -1); - } else if (sqlTypeName == SqlTypeName.MULTISET) { - RelDataType elementType = - deserialize(objectNode.get(FIELD_NAME_ELEMENT), codec, ctx); - type = typeFactory.createMultisetType(elementType, -1); - } else if (sqlTypeName == SqlTypeName.MAP) { - RelDataType keyType = deserialize(objectNode.get(FIELD_NAME_KEY), codec, ctx); - RelDataType valueType = - deserialize(objectNode.get(FIELD_NAME_VALUE), codec, ctx); - type = typeFactory.createMapType(keyType, valueType); - } else if (precision == null) { - type = typeFactory.createSqlType(sqlTypeName); - } else if (scale == null) { - type = typeFactory.createSqlType(sqlTypeName, precision); - } else { - type = typeFactory.createSqlType(sqlTypeName, precision, scale); - } - return typeFactory.createTypeWithNullability(type, nullable); - } - } else if (jsonNode instanceof TextNode) { - SqlTypeName sqlTypeName = Util.enumVal(SqlTypeName.class, jsonNode.asText()); - return typeFactory.createSqlType(sqlTypeName); - } else { - throw new TableException("Unknown type: " + jsonNode.toPrettyString()); - } + final JsonNode logicalTypeNode = jsonParser.readValueAsTree(); + final SerdeContext serdeContext = SerdeContext.get(ctx); + final FlinkTypeFactory typeFactory = serdeContext.getTypeFactory(); + final LogicalType logicalType = + LogicalTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext); + return LogicalRelDataTypeConverter.toRelDataType(logicalType, typeFactory); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java index 01f500c..5d76abb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java @@ -18,49 +18,28 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; -import org.apache.flink.table.planner.plan.schema.GenericRelDataType; -import org.apache.flink.table.planner.plan.schema.RawRelDataType; -import org.apache.flink.table.planner.plan.schema.StructuredRelDataType; -import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType; -import org.apache.flink.table.types.logical.TimestampKind; -import org.apache.flink.table.types.logical.TypeInformationRawType; -import org.apache.flink.table.utils.EncodingUtils; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.sql.type.ArraySqlType; -import org.apache.calcite.sql.type.MapSqlType; -import org.apache.calcite.sql.type.MultisetSqlType; -import org.apache.calcite.sql.type.SqlTypeName; import java.io.IOException; /** - * JSON serializer for {@link RelDataType}. refer to {@link RelDataTypeJsonDeserializer} for - * deserializer. + * JSON serializer for {@link RelDataType}. + * + * @see RelDataTypeJsonDeserializer for the reverse operation. */ -public class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> { +@Internal +public final class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> { private static final long serialVersionUID = 1L; - public static final String FIELD_NAME_TYPE_NAME = "typeName"; - public static final String FIELD_NAME_FILED_NAME = "fieldName"; - public static final String FIELD_NAME_NULLABLE = "nullable"; - public static final String FIELD_NAME_PRECISION = "precision"; - public static final String FIELD_NAME_SCALE = "scale"; - public static final String FIELD_NAME_FIELDS = "fields"; - public static final String FIELD_NAME_STRUCT_KIND = "structKind"; - public static final String FIELD_NAME_TIMESTAMP_KIND = "timestampKind"; - public static final String FIELD_NAME_ELEMENT = "element"; - public static final String FIELD_NAME_KEY = "key"; - public static final String FIELD_NAME_VALUE = "value"; - public static final String FIELD_NAME_TYPE_INFO = "typeInfo"; - public static final String FIELD_NAME_RAW_TYPE = "rawType"; - public static final String FIELD_NAME_STRUCTURED_TYPE = "structuredType"; - public RelDataTypeJsonSerializer() { super(RelDataType.class); } @@ -71,103 +50,14 @@ public class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> { JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { - jsonGenerator.writeStartObject(); - serializeInternal(relDataType, jsonGenerator, serializerProvider); - jsonGenerator.writeEndObject(); - } - - private void serializeInternal( - RelDataType relDataType, JsonGenerator gen, SerializerProvider serializerProvider) - throws IOException { - if (relDataType instanceof TimeIndicatorRelDataType) { - TimeIndicatorRelDataType timeIndicatorType = (TimeIndicatorRelDataType) relDataType; - gen.writeStringField( - FIELD_NAME_TIMESTAMP_KIND, - timeIndicatorType.isEventTime() - ? TimestampKind.ROWTIME.name() - : TimestampKind.PROCTIME.name()); - gen.writeStringField( - FIELD_NAME_TYPE_NAME, timeIndicatorType.originalType().getSqlTypeName().name()); - gen.writeBooleanField(FIELD_NAME_NULLABLE, relDataType.isNullable()); - } else if (relDataType instanceof StructuredRelDataType) { - StructuredRelDataType structuredType = (StructuredRelDataType) relDataType; - serializerProvider.defaultSerializeField( - FIELD_NAME_STRUCTURED_TYPE, structuredType.getStructuredType(), gen); - } else if (relDataType.isStruct()) { - gen.writeStringField(FIELD_NAME_STRUCT_KIND, relDataType.getStructKind().name()); - gen.writeBooleanField(FIELD_NAME_NULLABLE, relDataType.isNullable()); - - gen.writeFieldName(FIELD_NAME_FIELDS); - gen.writeStartArray(); - for (RelDataTypeField field : relDataType.getFieldList()) { - gen.writeStartObject(); - serializeInternal(field.getType(), gen, serializerProvider); - gen.writeStringField(FIELD_NAME_FILED_NAME, field.getName()); - gen.writeEndObject(); - } - gen.writeEndArray(); - } else if (relDataType.getSqlTypeName() == SqlTypeName.ARRAY) { - serializeCommon(relDataType, gen); - ArraySqlType arraySqlType = (ArraySqlType) relDataType; - - gen.writeFieldName(FIELD_NAME_ELEMENT); - gen.writeStartObject(); - serializeInternal(arraySqlType.getComponentType(), gen, serializerProvider); - gen.writeEndObject(); - } else if (relDataType.getSqlTypeName() == SqlTypeName.MULTISET) { - assert relDataType instanceof MultisetSqlType; - serializeCommon(relDataType, gen); - MultisetSqlType multisetSqlType = (MultisetSqlType) relDataType; - - gen.writeFieldName(FIELD_NAME_ELEMENT); - gen.writeStartObject(); - serializeInternal(multisetSqlType.getComponentType(), gen, serializerProvider); - gen.writeEndObject(); - } else if (relDataType.getSqlTypeName() == SqlTypeName.MAP) { - assert relDataType instanceof MapSqlType; - serializeCommon(relDataType, gen); - MapSqlType mapSqlType = (MapSqlType) relDataType; - - gen.writeFieldName(FIELD_NAME_KEY); - gen.writeStartObject(); - serializeInternal(mapSqlType.getKeyType(), gen, serializerProvider); - gen.writeEndObject(); - - gen.writeFieldName(FIELD_NAME_VALUE); - gen.writeStartObject(); - serializeInternal(mapSqlType.getValueType(), gen, serializerProvider); - gen.writeEndObject(); - } else if (relDataType instanceof GenericRelDataType) { - assert relDataType.getSqlTypeName() == SqlTypeName.ANY; - serializeCommon(relDataType, gen); - TypeInformationRawType<?> rawType = ((GenericRelDataType) relDataType).genericType(); - - gen.writeFieldName(FIELD_NAME_RAW_TYPE); - gen.writeStartObject(); - gen.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable()); - gen.writeStringField( - FIELD_NAME_TYPE_INFO, - EncodingUtils.encodeObjectToString(rawType.getTypeInformation())); - gen.writeEndObject(); - } else if (relDataType instanceof RawRelDataType) { - assert relDataType.getSqlTypeName() == SqlTypeName.OTHER; - serializeCommon(relDataType, gen); - RawRelDataType rawType = (RawRelDataType) relDataType; - gen.writeStringField(FIELD_NAME_RAW_TYPE, rawType.getRawType().asSerializableString()); - } else { - serializeCommon(relDataType, gen); - } - } - - private void serializeCommon(RelDataType relDataType, JsonGenerator gen) throws IOException { - final SqlTypeName typeName = relDataType.getSqlTypeName(); - gen.writeStringField(FIELD_NAME_TYPE_NAME, typeName.name()); - gen.writeBooleanField(FIELD_NAME_NULLABLE, relDataType.isNullable()); - if (relDataType.getSqlTypeName().allowsPrec()) { - gen.writeNumberField(FIELD_NAME_PRECISION, relDataType.getPrecision()); - } - if (relDataType.getSqlTypeName().allowsScale()) { - gen.writeNumberField(FIELD_NAME_SCALE, relDataType.getScale()); - } + final SerdeContext serdeContext = SerdeContext.get(serializerProvider); + final DataTypeFactory dataTypeFactory = + serdeContext.getFlinkContext().getCatalogManager().getDataTypeFactory(); + // Conversion to LogicalType also ensures that Calcite's type system is materialized + // so data types like DECIMAL will receive a concrete precision and scale (not unspecified + // anymore). + final LogicalType logicalType = + LogicalRelDataTypeConverter.toLogicalType(relDataType, dataTypeFactory); + serializerProvider.defaultSerializeValue(logicalType, jsonGenerator); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java index f16e805..427e488 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java @@ -106,52 +106,63 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> { switch (rexNode.getKind()) { case INPUT_REF: case TABLE_INPUT_REF: - serialize((RexInputRef) rexNode, jsonGenerator); + serialize((RexInputRef) rexNode, jsonGenerator, serializerProvider); break; case LITERAL: - serialize((RexLiteral) rexNode, jsonGenerator); + serialize((RexLiteral) rexNode, jsonGenerator, serializerProvider); break; case FIELD_ACCESS: - serialize((RexFieldAccess) rexNode, jsonGenerator); + serialize((RexFieldAccess) rexNode, jsonGenerator, serializerProvider); break; case CORREL_VARIABLE: - serialize((RexCorrelVariable) rexNode, jsonGenerator); + serialize((RexCorrelVariable) rexNode, jsonGenerator, serializerProvider); break; case PATTERN_INPUT_REF: - serialize((RexPatternFieldRef) rexNode, jsonGenerator); + serialize((RexPatternFieldRef) rexNode, jsonGenerator, serializerProvider); break; default: if (rexNode instanceof RexCall) { - serialize((RexCall) rexNode, jsonGenerator); + serialize((RexCall) rexNode, jsonGenerator, serializerProvider); } else { throw new TableException("Unknown RexNode: " + rexNode); } } } - private void serialize(RexPatternFieldRef inputRef, JsonGenerator gen) throws IOException { + private void serialize( + RexPatternFieldRef inputRef, JsonGenerator gen, SerializerProvider serializerProvider) + throws IOException { gen.writeStartObject(); gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_PATTERN_INPUT_REF); gen.writeStringField(FIELD_NAME_ALPHA, inputRef.getAlpha()); gen.writeNumberField(FIELD_NAME_INPUT_INDEX, inputRef.getIndex()); - gen.writeObjectField(FIELD_NAME_TYPE, inputRef.getType()); + serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, inputRef.getType(), gen); gen.writeEndObject(); } - private void serialize(RexInputRef inputRef, JsonGenerator gen) throws IOException { + private void serialize( + RexInputRef inputRef, JsonGenerator gen, SerializerProvider serializerProvider) + throws IOException { gen.writeStartObject(); gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_INPUT_REF); gen.writeNumberField(FIELD_NAME_INPUT_INDEX, inputRef.getIndex()); - gen.writeObjectField(FIELD_NAME_TYPE, inputRef.getType()); + serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, inputRef.getType(), gen); gen.writeEndObject(); } - private void serialize(RexLiteral literal, JsonGenerator gen) throws IOException { + private void serialize( + RexLiteral literal, JsonGenerator gen, SerializerProvider serializerProvider) + throws IOException { gen.writeStartObject(); gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_LITERAL); Comparable<?> value = literal.getValueAs(Comparable.class); - serialize(value, literal.getTypeName(), literal.getType().getSqlTypeName(), gen); - gen.writeObjectField(FIELD_NAME_TYPE, literal.getType()); + serialize( + value, + literal.getTypeName(), + literal.getType().getSqlTypeName(), + gen, + serializerProvider); + serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, literal.getType(), gen); gen.writeEndObject(); } @@ -160,7 +171,8 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> { Comparable<?> value, SqlTypeName literalTypeName, SqlTypeName elementTypeName, - JsonGenerator gen) + JsonGenerator gen, + SerializerProvider serializerProvider) throws IOException { if (value == null) { gen.writeNullField(FIELD_NAME_VALUE); @@ -229,14 +241,14 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> { gen.writeStringField(FIELD_NAME_CLASS, value.getClass().getName()); break; case SARG: - serialize((Sarg<?>) value, elementTypeName, gen); + serialize((Sarg<?>) value, elementTypeName, gen, serializerProvider); break; case ROW: case MULTISET: gen.writeFieldName(FIELD_NAME_VALUE); gen.writeStartArray(); for (RexLiteral v : (FlatLists.ComparableList<RexLiteral>) value) { - serialize(v, gen); + serialize(v, gen, serializerProvider); } gen.writeEndArray(); break; @@ -247,7 +259,11 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> { } @SuppressWarnings("UnstableApiUsage") - private void serialize(Sarg<?> value, SqlTypeName sqlTypeName, JsonGenerator gen) + private void serialize( + Sarg<?> value, + SqlTypeName sqlTypeName, + JsonGenerator gen, + SerializerProvider serializerProvider) throws IOException { gen.writeFieldName(FIELD_NAME_SARG); gen.writeStartObject(); @@ -258,14 +274,14 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> { if (range.hasLowerBound()) { gen.writeFieldName(FIELD_NAME_BOUND_LOWER); gen.writeStartObject(); - serialize(range.lowerEndpoint(), sqlTypeName, sqlTypeName, gen); + serialize(range.lowerEndpoint(), sqlTypeName, sqlTypeName, gen, serializerProvider); gen.writeStringField(FIELD_NAME_BOUND_TYPE, range.lowerBoundType().name()); gen.writeEndObject(); } if (range.hasUpperBound()) { gen.writeFieldName(FIELD_NAME_BOUND_UPPER); gen.writeStartObject(); - serialize(range.upperEndpoint(), sqlTypeName, sqlTypeName, gen); + serialize(range.upperEndpoint(), sqlTypeName, sqlTypeName, gen, serializerProvider); gen.writeStringField(FIELD_NAME_BOUND_TYPE, range.upperBoundType().name()); gen.writeEndObject(); } @@ -276,23 +292,29 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> { gen.writeEndObject(); } - private void serialize(RexFieldAccess fieldAccess, JsonGenerator gen) throws IOException { + private void serialize( + RexFieldAccess fieldAccess, JsonGenerator gen, SerializerProvider serializerProvider) + throws IOException { gen.writeStartObject(); gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_FIELD_ACCESS); gen.writeStringField(FIELD_NAME_NAME, fieldAccess.getField().getName()); - gen.writeObjectField(FIELD_NAME_EXPR, fieldAccess.getReferenceExpr()); + serializerProvider.defaultSerializeField( + FIELD_NAME_EXPR, fieldAccess.getReferenceExpr(), gen); gen.writeEndObject(); } - private void serialize(RexCorrelVariable variable, JsonGenerator gen) throws IOException { + private void serialize( + RexCorrelVariable variable, JsonGenerator gen, SerializerProvider serializerProvider) + throws IOException { gen.writeStartObject(); gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_CORREL_VARIABLE); gen.writeStringField(FIELD_NAME_CORREL, variable.getName()); - gen.writeObjectField(FIELD_NAME_TYPE, variable.getType()); + serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, variable.getType(), gen); gen.writeEndObject(); } - private void serialize(RexCall call, JsonGenerator gen) throws IOException { + private void serialize(RexCall call, JsonGenerator gen, SerializerProvider serializerProvider) + throws IOException { if (!call.getClass().isAssignableFrom(RexCall.class)) { throw new TableException("Unknown RexCall: " + call); } @@ -302,10 +324,10 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> { gen.writeFieldName(FIELD_NAME_OPERANDS); gen.writeStartArray(); for (RexNode operand : call.getOperands()) { - gen.writeObject(operand); + serializerProvider.defaultSerializeValue(operand, gen); } gen.writeEndArray(); - gen.writeObjectField(FIELD_NAME_TYPE, call.getType()); + serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, call.getType(), gen); gen.writeEndObject(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java index ac6dcc2..021731c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java @@ -75,7 +75,8 @@ public class RexWindowBoundJsonSerializer extends StdSerializer<RexWindowBound> } else { throw new TableException("Unknown RexWindowBound: " + rexWindowBound); } - gen.writeObjectField(FIELD_NAME_OFFSET, rexWindowBound.getOffset()); + serializerProvider.defaultSerializeField( + FIELD_NAME_OFFSET, rexWindowBound.getOffset(), gen); } gen.writeEndObject(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java index 78b5ec3..3021040 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java @@ -54,7 +54,7 @@ public final class StructuredRelDataType extends ObjectSqlType { private final StructuredType structuredType; - private StructuredRelDataType(StructuredType structuredType, List<RelDataTypeField> fields) { + public StructuredRelDataType(StructuredType structuredType, List<RelDataTypeField> fields) { super( SqlTypeName.STRUCTURED, createSqlIdentifier(structuredType), diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverter.java new file mode 100644 index 0000000..5645031 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverter.java @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.schema.RawRelDataType; +import org.apache.flink.table.planner.plan.schema.StructuredRelDataType; +import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DayTimeIntervalType; +import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeVisitor; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.RowType.RowField; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.SymbolType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; +import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution; +import org.apache.flink.table.types.logical.ZonedTimestampType; + +import org.apache.calcite.avatica.util.TimeUnit; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rel.type.StructKind; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.BasicSqlType; +import org.apache.calcite.sql.type.SqlTypeName; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Symmetric converter between {@link LogicalType} and {@link RelDataType}. + * + * <p>This converter has many similarities with {@link FlinkTypeFactory} and might also replace it + * at some point. However, for now it is more consistent and does not lose information (i.e. for + * TIME(9) or interval types). It still delegates to {@link RelDataTypeFactory} but only for + * predefined/basic types. + * + * <p>Note: The conversion to {@link LogicalType} is not 100% symmetric and is currently optimized + * for expressions. Information about the {@link StructKind} of a {@link RelRecordType} is always + * set to {@link StructKind#PEEK_FIELDS_NO_EXPAND}. Missing precision and scale will be filled with + * Flink's default values such that all resulting {@link LogicalType}s will be fully resolved. + */ +@Internal +public final class LogicalRelDataTypeConverter { + + public static RelDataType toRelDataType( + LogicalType logicalType, RelDataTypeFactory relDataTypeFactory) { + final LogicalToRelDataTypeConverter converter = + new LogicalToRelDataTypeConverter(relDataTypeFactory); + final RelDataType relDataType = logicalType.accept(converter); + // this also canonizes in the factory (see SqlTypeFactoryImpl.canonize) + return relDataTypeFactory.createTypeWithNullability(relDataType, logicalType.isNullable()); + } + + public static LogicalType toLogicalType( + RelDataType relDataType, DataTypeFactory dataTypeFactory) { + final LogicalType logicalType = toLogicalTypeNotNull(relDataType, dataTypeFactory); + return logicalType.copy(relDataType.isNullable()); + } + + // -------------------------------------------------------------------------------------------- + // LogicalType to RelDataType + // -------------------------------------------------------------------------------------------- + + private static class LogicalToRelDataTypeConverter implements LogicalTypeVisitor<RelDataType> { + + private final RelDataTypeFactory relDataTypeFactory; + + LogicalToRelDataTypeConverter(RelDataTypeFactory relDataTypeFactory) { + this.relDataTypeFactory = relDataTypeFactory; + } + + @Override + public RelDataType visit(CharType charType) { + return relDataTypeFactory.createSqlType(SqlTypeName.CHAR, charType.getLength()); + } + + @Override + public RelDataType visit(VarCharType varCharType) { + return relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR, varCharType.getLength()); + } + + @Override + public RelDataType visit(BooleanType booleanType) { + return relDataTypeFactory.createSqlType(SqlTypeName.BOOLEAN); + } + + @Override + public RelDataType visit(BinaryType binaryType) { + return relDataTypeFactory.createSqlType(SqlTypeName.BINARY, binaryType.getLength()); + } + + @Override + public RelDataType visit(VarBinaryType varBinaryType) { + return relDataTypeFactory.createSqlType( + SqlTypeName.VARBINARY, varBinaryType.getLength()); + } + + @Override + public RelDataType visit(DecimalType decimalType) { + return relDataTypeFactory.createSqlType( + SqlTypeName.DECIMAL, decimalType.getPrecision(), decimalType.getScale()); + } + + @Override + public RelDataType visit(TinyIntType tinyIntType) { + return relDataTypeFactory.createSqlType(SqlTypeName.TINYINT); + } + + @Override + public RelDataType visit(SmallIntType smallIntType) { + return relDataTypeFactory.createSqlType(SqlTypeName.SMALLINT); + } + + @Override + public RelDataType visit(IntType intType) { + return relDataTypeFactory.createSqlType(SqlTypeName.INTEGER); + } + + @Override + public RelDataType visit(BigIntType bigIntType) { + return relDataTypeFactory.createSqlType(SqlTypeName.BIGINT); + } + + @Override + public RelDataType visit(FloatType floatType) { + return relDataTypeFactory.createSqlType(SqlTypeName.FLOAT); + } + + @Override + public RelDataType visit(DoubleType doubleType) { + return relDataTypeFactory.createSqlType(SqlTypeName.DOUBLE); + } + + @Override + public RelDataType visit(DateType dateType) { + return relDataTypeFactory.createSqlType(SqlTypeName.DATE); + } + + @Override + public RelDataType visit(TimeType timeType) { + return relDataTypeFactory.createSqlType(SqlTypeName.TIME, timeType.getPrecision()); + } + + @Override + public RelDataType visit(TimestampType timestampType) { + final RelDataType timestampRelDataType = + relDataTypeFactory.createSqlType( + SqlTypeName.TIMESTAMP, timestampType.getPrecision()); + switch (timestampType.getKind()) { + case REGULAR: + return timestampRelDataType; + case ROWTIME: + assert timestampType.getPrecision() == 3; + return new TimeIndicatorRelDataType( + relDataTypeFactory.getTypeSystem(), + (BasicSqlType) timestampRelDataType, + timestampType.isNullable(), + true); + default: + throw new TableException("Unknown timestamp kind."); + } + } + + @Override + public RelDataType visit(ZonedTimestampType zonedTimestampType) { + throw new TableException("TIMESTAMP WITH TIME ZONE is currently not supported."); + } + + @Override + public RelDataType visit(LocalZonedTimestampType localZonedTimestampType) { + final RelDataType timestampRelDataType = + relDataTypeFactory.createSqlType( + SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, + localZonedTimestampType.getPrecision()); + switch (localZonedTimestampType.getKind()) { + case REGULAR: + return timestampRelDataType; + case ROWTIME: + assert localZonedTimestampType.getPrecision() == 3; + return new TimeIndicatorRelDataType( + relDataTypeFactory.getTypeSystem(), + (BasicSqlType) timestampRelDataType, + localZonedTimestampType.isNullable(), + true); + case PROCTIME: + assert localZonedTimestampType.getPrecision() == 3; + return new TimeIndicatorRelDataType( + relDataTypeFactory.getTypeSystem(), + (BasicSqlType) timestampRelDataType, + localZonedTimestampType.isNullable(), + false); + default: + throw new TableException("Unknown timestamp kind."); + } + } + + @Override + public RelDataType visit(YearMonthIntervalType yearMonthIntervalType) { + final int yearPrecision = yearMonthIntervalType.getYearPrecision(); + final SqlIntervalQualifier intervalQualifier; + switch (yearMonthIntervalType.getResolution()) { + case YEAR: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.YEAR, + yearPrecision, + TimeUnit.YEAR, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO); + break; + case YEAR_TO_MONTH: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.YEAR, + yearPrecision, + TimeUnit.MONTH, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO); + break; + case MONTH: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.MONTH, + RelDataType.PRECISION_NOT_SPECIFIED, + TimeUnit.MONTH, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO); + break; + default: + throw new IllegalArgumentException("Unknown interval resolution."); + } + return relDataTypeFactory.createSqlIntervalType(intervalQualifier); + } + + @Override + public RelDataType visit(DayTimeIntervalType dayTimeIntervalType) { + final int dayPrecision = dayTimeIntervalType.getDayPrecision(); + final int fractionalPrecision = dayTimeIntervalType.getFractionalPrecision(); + final SqlIntervalQualifier intervalQualifier; + switch (dayTimeIntervalType.getResolution()) { + case DAY: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.DAY, + dayPrecision, + TimeUnit.DAY, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO); + break; + case DAY_TO_HOUR: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.DAY, + dayPrecision, + TimeUnit.HOUR, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO); + break; + case DAY_TO_MINUTE: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.DAY, + dayPrecision, + TimeUnit.MINUTE, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO); + break; + case DAY_TO_SECOND: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.DAY, + dayPrecision, + TimeUnit.SECOND, + fractionalPrecision, + SqlParserPos.ZERO); + break; + case HOUR: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.HOUR, + RelDataType.PRECISION_NOT_SPECIFIED, + TimeUnit.HOUR, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO); + break; + case HOUR_TO_MINUTE: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.HOUR, + RelDataType.PRECISION_NOT_SPECIFIED, + TimeUnit.MINUTE, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO); + break; + case HOUR_TO_SECOND: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.HOUR, + RelDataType.PRECISION_NOT_SPECIFIED, + TimeUnit.SECOND, + fractionalPrecision, + SqlParserPos.ZERO); + break; + case MINUTE: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.MINUTE, + RelDataType.PRECISION_NOT_SPECIFIED, + TimeUnit.MINUTE, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO); + break; + case MINUTE_TO_SECOND: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.MINUTE, + RelDataType.PRECISION_NOT_SPECIFIED, + TimeUnit.SECOND, + fractionalPrecision, + SqlParserPos.ZERO); + break; + case SECOND: + intervalQualifier = + new SqlIntervalQualifier( + TimeUnit.SECOND, + RelDataType.PRECISION_NOT_SPECIFIED, + TimeUnit.SECOND, + fractionalPrecision, + SqlParserPos.ZERO); + break; + default: + throw new TableException("Unknown interval resolution."); + } + return relDataTypeFactory.createSqlIntervalType(intervalQualifier); + } + + @Override + public RelDataType visit(ArrayType arrayType) { + final RelDataType elementRelDataType = + toRelDataType(arrayType.getElementType(), relDataTypeFactory); + return relDataTypeFactory.createArrayType(elementRelDataType, -1); + } + + @Override + public RelDataType visit(MultisetType multisetType) { + final RelDataType elementRelDataType = + toRelDataType(multisetType.getElementType(), relDataTypeFactory); + return relDataTypeFactory.createMultisetType(elementRelDataType, -1); + } + + @Override + public RelDataType visit(MapType mapType) { + final RelDataType keyRelDataType = + toRelDataType(mapType.getKeyType(), relDataTypeFactory); + final RelDataType valueRelDataType = + toRelDataType(mapType.getValueType(), relDataTypeFactory); + return relDataTypeFactory.createMapType(keyRelDataType, valueRelDataType); + } + + @Override + public RelDataType visit(RowType rowType) { + return relDataTypeFactory.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, + rowType.getFields().stream() + .map(f -> toRelDataType(f.getType(), relDataTypeFactory)) + .collect(Collectors.toList()), + rowType.getFieldNames()); + } + + @Override + public RelDataType visit(DistinctType distinctType) { + throw new TableException("DISTINCT type is currently not supported."); + } + + @Override + public RelDataType visit(StructuredType structuredType) { + final List<RelDataTypeField> fields = new ArrayList<>(); + for (int i = 0; i < structuredType.getAttributes().size(); i++) { + final StructuredType.StructuredAttribute attribute = + structuredType.getAttributes().get(i); + final RelDataTypeField field = + new RelDataTypeFieldImpl( + attribute.getName(), + i, + toRelDataType(attribute.getType(), relDataTypeFactory)); + fields.add(field); + } + return new StructuredRelDataType(structuredType, fields); + } + + @Override + public RelDataType visit(NullType nullType) { + return relDataTypeFactory.createSqlType(SqlTypeName.NULL); + } + + @Override + public RelDataType visit(RawType<?> rawType) { + return new RawRelDataType(rawType); + } + + @Override + public RelDataType visit(SymbolType<?> symbolType) { + return relDataTypeFactory.createSqlType(SqlTypeName.SYMBOL); + } + + @Override + public RelDataType visit(LogicalType other) { + throw new TableException( + String.format( + "Logical type '%s' cannot be converted to a RelDataType.", other)); + } + } + + // -------------------------------------------------------------------------------------------- + // RelDataType to LogicalType + // -------------------------------------------------------------------------------------------- + + private static LogicalType toLogicalTypeNotNull( + RelDataType relDataType, DataTypeFactory dataTypeFactory) { + // dataTypeFactory is a preparation for catalog user-defined types + switch (relDataType.getSqlTypeName()) { + case BOOLEAN: + return new BooleanType(false); + case TINYINT: + return new TinyIntType(false); + case SMALLINT: + return new SmallIntType(false); + case INTEGER: + return new IntType(false); + case BIGINT: + return new BigIntType(false); + case DECIMAL: + if (relDataType.getScale() < 0) { + // negative scale is not supported, normalize it + return new DecimalType( + false, relDataType.getPrecision() - relDataType.getScale(), 0); + } + return new DecimalType(false, relDataType.getPrecision(), relDataType.getScale()); + case FLOAT: + return new FloatType(false); + case DOUBLE: + return new DoubleType(false); + case DATE: + return new DateType(false); + case TIME: + return new TimeType(false, relDataType.getPrecision()); + case TIMESTAMP: + return new TimestampType( + false, getTimestampKind(relDataType), relDataType.getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return new LocalZonedTimestampType( + false, getTimestampKind(relDataType), relDataType.getPrecision()); + case INTERVAL_YEAR: + case INTERVAL_YEAR_MONTH: + case INTERVAL_MONTH: + return new YearMonthIntervalType( + false, getYearMonthResolution(relDataType), relDataType.getPrecision()); + case INTERVAL_DAY: + case INTERVAL_DAY_HOUR: + case INTERVAL_DAY_MINUTE: + case INTERVAL_DAY_SECOND: + case INTERVAL_HOUR: + case INTERVAL_HOUR_MINUTE: + case INTERVAL_HOUR_SECOND: + case INTERVAL_MINUTE: + case INTERVAL_MINUTE_SECOND: + return new DayTimeIntervalType( + false, + getDayTimeResolution(relDataType), + relDataType.getPrecision(), + relDataType.getScale()); + case INTERVAL_SECOND: + return new DayTimeIntervalType( + false, + getDayTimeResolution(relDataType), + DayTimeIntervalType.DEFAULT_DAY_PRECISION, + relDataType.getScale()); + case CHAR: + if (relDataType.getPrecision() == 0) { + return CharType.ofEmptyLiteral(); + } + return new CharType(false, relDataType.getPrecision()); + case VARCHAR: + if (relDataType.getPrecision() == 0) { + return VarCharType.ofEmptyLiteral(); + } + return new VarCharType(false, relDataType.getPrecision()); + case BINARY: + if (relDataType.getPrecision() == 0) { + return BinaryType.ofEmptyLiteral(); + } + return new BinaryType(false, relDataType.getPrecision()); + case VARBINARY: + if (relDataType.getPrecision() == 0) { + return VarBinaryType.ofEmptyLiteral(); + } + return new VarBinaryType(false, relDataType.getPrecision()); + case NULL: + return new NullType(); + case SYMBOL: + return new SymbolType<>(false); + case MULTISET: + return new MultisetType( + false, toLogicalType(relDataType.getComponentType(), dataTypeFactory)); + case ARRAY: + return new ArrayType( + false, toLogicalType(relDataType.getComponentType(), dataTypeFactory)); + case MAP: + return new MapType( + false, + toLogicalType(relDataType.getKeyType(), dataTypeFactory), + toLogicalType(relDataType.getValueType(), dataTypeFactory)); + case DISTINCT: + throw new TableException("DISTINCT type is currently not supported."); + case ROW: + return new RowType( + false, + relDataType.getFieldList().stream() + .map( + f -> + new RowField( + f.getName(), + toLogicalType( + f.getType(), dataTypeFactory))) + .collect(Collectors.toList())); + case STRUCTURED: + case OTHER: + if (relDataType instanceof StructuredRelDataType) { + return ((StructuredRelDataType) relDataType).getStructuredType(); + } else if (relDataType instanceof RawRelDataType) { + return ((RawRelDataType) relDataType).getRawType(); + } + // fall through + case REAL: + case TIME_WITH_LOCAL_TIME_ZONE: + case ANY: + case CURSOR: + case COLUMN_LIST: + case DYNAMIC_STAR: + case GEOMETRY: + case SARG: + default: + throw new TableException("Unsupported RelDataType: " + relDataType); + } + } + + private static TimestampKind getTimestampKind(RelDataType relDataType) { + if (relDataType instanceof TimeIndicatorRelDataType) { + final TimeIndicatorRelDataType timeIndicator = (TimeIndicatorRelDataType) relDataType; + if (timeIndicator.isEventTime()) { + return TimestampKind.ROWTIME; + } else { + return TimestampKind.PROCTIME; + } + } else { + return TimestampKind.REGULAR; + } + } + + private static YearMonthResolution getYearMonthResolution(RelDataType relDataType) { + switch (relDataType.getSqlTypeName()) { + case INTERVAL_YEAR: + return YearMonthResolution.YEAR; + case INTERVAL_YEAR_MONTH: + return YearMonthResolution.YEAR_TO_MONTH; + case INTERVAL_MONTH: + return YearMonthResolution.MONTH; + default: + throw new TableException("Unsupported YearMonthResolution."); + } + } + + private static DayTimeResolution getDayTimeResolution(RelDataType relDataType) { + switch (relDataType.getSqlTypeName()) { + case INTERVAL_DAY: + return DayTimeResolution.DAY; + case INTERVAL_DAY_HOUR: + return DayTimeResolution.DAY_TO_HOUR; + case INTERVAL_DAY_MINUTE: + return DayTimeResolution.DAY_TO_MINUTE; + case INTERVAL_DAY_SECOND: + return DayTimeResolution.DAY_TO_SECOND; + case INTERVAL_HOUR: + return DayTimeResolution.HOUR; + case INTERVAL_HOUR_MINUTE: + return DayTimeResolution.HOUR_TO_MINUTE; + case INTERVAL_HOUR_SECOND: + return DayTimeResolution.HOUR_TO_SECOND; + case INTERVAL_MINUTE: + return DayTimeResolution.MINUTE; + case INTERVAL_MINUTE_SECOND: + return DayTimeResolution.MINUTE_TO_SECOND; + case INTERVAL_SECOND: + return DayTimeResolution.SECOND; + default: + throw new TableException("Unsupported DayTimeResolution."); + } + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java index e2a335e..a7b176a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java @@ -19,18 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.module.ModuleManager; -import org.apache.flink.table.planner.calcite.FlinkContextImpl; -import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.utils.CatalogManagerMocks; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -38,6 +27,9 @@ import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.util.stream.Stream; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.configuredSerdeContext; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toJson; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toObject; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link DataType} serialization and deserialization. */ @@ -48,7 +40,7 @@ public class DataTypeJsonSerdeTest { public void testDataTypeSerde(DataType dataType) throws IOException { final SerdeContext serdeContext = configuredSerdeContext(); final String json = toJson(serdeContext, dataType); - final DataType actual = toDataType(serdeContext, json); + final DataType actual = toObject(serdeContext, json, DataType.class); assertThat(actual).isEqualTo(dataType); } @@ -83,41 +75,6 @@ public class DataTypeJsonSerdeTest { // Shared utilities // -------------------------------------------------------------------------------------------- - static SerdeContext configuredSerdeContext() { - return configuredSerdeContext( - CatalogManagerMocks.createEmptyCatalogManager(), TableConfig.getDefault()); - } - - static SerdeContext configuredSerdeContext( - CatalogManager catalogManager, TableConfig tableConfig) { - return new SerdeContext( - new FlinkContextImpl( - false, tableConfig, new ModuleManager(), null, catalogManager, null), - Thread.currentThread().getContextClassLoader(), - FlinkTypeFactory.INSTANCE(), - FlinkSqlOperatorTable.instance()); - } - - static String toJson(SerdeContext serdeContext, DataType dataType) { - final ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeContext); - final String json; - try { - json = objectWriter.writeValueAsString(dataType); - } catch (JsonProcessingException e) { - throw new AssertionError(e); - } - return json; - } - - static DataType toDataType(SerdeContext serdeContext, String json) { - final ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeContext); - try { - return objectReader.readValue(json, DataType.class); - } catch (IOException e) { - throw new AssertionError(e); - } - } - /** Testing class. */ public static class PojoClass { public int f0; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java index 7d095f0..1fb5b43 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java @@ -53,6 +53,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectRea import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; import org.apache.calcite.avatica.util.TimeUnit; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.sql.SqlIntervalQualifier; import org.apache.calcite.sql.fun.SqlStdOperatorTable; @@ -221,7 +222,9 @@ public class DynamicTableSourceSpecSerdeTest { BigDecimal.valueOf(1000), new SqlIntervalQualifier( TimeUnit.SECOND, + RelDataType.PRECISION_NOT_SPECIFIED, TimeUnit.SECOND, + 3, SqlParserPos.ZERO))), 5000, RowType.of( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java new file mode 100644 index 0000000..f50dd9a --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.planner.calcite.FlinkContextImpl; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.utils.CatalogManagerMocks; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; + +import java.io.IOException; + +/** Mocks and utilities for serde tests. */ +final class JsonSerdeMocks { + + private JsonSerdeMocks() { + // no instantiation + } + + static SerdeContext configuredSerdeContext() { + return configuredSerdeContext( + CatalogManagerMocks.createEmptyCatalogManager(), TableConfig.getDefault()); + } + + static SerdeContext configuredSerdeContext( + CatalogManager catalogManager, TableConfig tableConfig) { + return new SerdeContext( + new FlinkContextImpl( + false, tableConfig, new ModuleManager(), null, catalogManager, null), + Thread.currentThread().getContextClassLoader(), + FlinkTypeFactory.INSTANCE(), + FlinkSqlOperatorTable.instance()); + } + + static String toJson(SerdeContext serdeContext, Object object) { + final ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeContext); + final String json; + try { + json = objectWriter.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new AssertionError(e); + } + return json; + } + + static <T> T toObject(SerdeContext serdeContext, String json, Class<T> clazz) { + final ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeContext); + try { + return objectReader.readValue(json, clazz); + } catch (IOException e) { + throw new AssertionError(e); + } + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java index d96dcb5..b8ff8a5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java @@ -69,8 +69,6 @@ import org.apache.flink.types.Row; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; import org.junit.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -88,7 +86,9 @@ import java.util.Optional; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation.ALL; import static org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation.IDENTIFIER; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerdeTest.configuredSerdeContext; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.configuredSerdeContext; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toJson; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toObject; import static org.apache.flink.table.utils.CatalogManagerMocks.preparedCatalogManager; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -102,7 +102,7 @@ public class LogicalTypeJsonSerdeTest { final SerdeContext serdeContext = configuredSerdeContext(); final String json = toJson(serdeContext, logicalType); - final LogicalType actual = toLogicalType(serdeContext, json); + final LogicalType actual = toObject(serdeContext, json, LogicalType.class); assertThat(actual).isEqualTo(logicalType); } @@ -126,7 +126,7 @@ public class LogicalTypeJsonSerdeTest { TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, TableConfigOptions.CatalogPlanRestore.IDENTIFIER); dataTypeFactoryMock.logicalType = Optional.empty(); - assertThatThrownBy(() -> toLogicalType(serdeContext, minimalJson)) + assertThatThrownBy(() -> toObject(serdeContext, minimalJson, LogicalType.class)) .satisfies(anyCauseMatches(ValidationException.class, "No type found.")); // catalog lookup @@ -134,7 +134,8 @@ public class LogicalTypeJsonSerdeTest { TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, TableConfigOptions.CatalogPlanRestore.IDENTIFIER); dataTypeFactoryMock.logicalType = Optional.of(STRUCTURED_TYPE); - assertThat(toLogicalType(serdeContext, minimalJson)).isEqualTo(STRUCTURED_TYPE); + assertThat(toObject(serdeContext, minimalJson, LogicalType.class)) + .isEqualTo(STRUCTURED_TYPE); // maximum plan content config.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, ALL); @@ -151,7 +152,7 @@ public class LogicalTypeJsonSerdeTest { TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, TableConfigOptions.CatalogPlanRestore.IDENTIFIER); dataTypeFactoryMock.logicalType = Optional.empty(); - assertThatThrownBy(() -> toLogicalType(serdeContext, maximumJson)) + assertThatThrownBy(() -> toObject(serdeContext, maximumJson, LogicalType.class)) .satisfies(anyCauseMatches(ValidationException.class, "No type found.")); // catalog lookup @@ -159,14 +160,16 @@ public class LogicalTypeJsonSerdeTest { TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, TableConfigOptions.CatalogPlanRestore.IDENTIFIER); dataTypeFactoryMock.logicalType = Optional.of(UPDATED_STRUCTURED_TYPE); - assertThat(toLogicalType(serdeContext, maximumJson)).isEqualTo(UPDATED_STRUCTURED_TYPE); + assertThat(toObject(serdeContext, maximumJson, LogicalType.class)) + .isEqualTo(UPDATED_STRUCTURED_TYPE); // no lookup config.set( TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS, TableConfigOptions.CatalogPlanRestore.ALL); dataTypeFactoryMock.logicalType = Optional.of(UPDATED_STRUCTURED_TYPE); - assertThat(toLogicalType(serdeContext, maximumJson)).isEqualTo(STRUCTURED_TYPE); + assertThat(toObject(serdeContext, maximumJson, LogicalType.class)) + .isEqualTo(STRUCTURED_TYPE); } // -------------------------------------------------------------------------------------------- @@ -418,28 +421,4 @@ public class LogicalTypeJsonSerdeTest { DataType dataType, boolean isInternalType) { return isInternalType ? dataType.toInternal() : dataType; } - - // -------------------------------------------------------------------------------------------- - // Shared utilities - // -------------------------------------------------------------------------------------------- - - static String toJson(SerdeContext serdeContext, LogicalType logicalType) { - final ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeContext); - final String json; - try { - json = objectWriter.writeValueAsString(logicalType); - } catch (JsonProcessingException e) { - throw new AssertionError(e); - } - return json; - } - - static LogicalType toLogicalType(SerdeContext serdeContext, String json) { - final ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeContext); - try { - return objectReader.readValue(json, LogicalType.class); - } catch (IOException e) { - throw new AssertionError(e); - } - } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java index 69aa587..5426fe2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java @@ -18,26 +18,15 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; -import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.base.VoidSerializer; -import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.module.ModuleManager; -import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; -import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.DayTimeIntervalType; import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LegacyTypeInformationType; -import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.RawType; import org.apache.flink.table.types.logical.StructuredType; -import org.apache.flink.table.types.logical.TypeInformationRawType; import org.apache.flink.table.types.logical.VarCharType; -import org.apache.flink.table.utils.CatalogManagerMocks; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; import org.apache.calcite.avatica.util.TimeUnit; import org.apache.calcite.rel.type.RelDataType; @@ -46,83 +35,123 @@ import org.apache.calcite.sql.SqlIntervalQualifier; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.runners.Parameterized.Parameters; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.configuredSerdeContext; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toJson; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toObject; +import static org.assertj.core.api.Assertions.assertThat; -/** Tests for serialization/deserialization of {@link RelDataType}. */ -@RunWith(Parameterized.class) +/** Tests for {@link RelDataType} serialization and deserialization. */ public class RelDataTypeJsonSerdeTest { + private static final FlinkTypeFactory FACTORY = FlinkTypeFactory.INSTANCE(); - @Parameterized.Parameters(name = "type = {0}") - public static Collection<RelDataType> parameters() { + @ParameterizedTest + @MethodSource("testRelDataTypeSerde") + public void testRelDataTypeSerde(RelDataType relDataType) throws IOException { + final SerdeContext serdeContext = configuredSerdeContext(); + + final String json = toJson(serdeContext, relDataType); + final RelDataType actual = toObject(serdeContext, json, RelDataType.class); + + assertThat(actual).isSameAs(relDataType); + } + + @Test + public void testMissingPrecisionAndScale() { + final SerdeContext serdeContext = configuredSerdeContext(); + + final String json = + toJson( + serdeContext, + FACTORY.createSqlIntervalType( + new SqlIntervalQualifier( + TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))); + final RelDataType actual = toObject(serdeContext, json, RelDataType.class); + + assertThat(actual) + .isSameAs( + FACTORY.createSqlIntervalType( + new SqlIntervalQualifier( + TimeUnit.DAY, + DayTimeIntervalType.DEFAULT_DAY_PRECISION, + TimeUnit.SECOND, + DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION, + SqlParserPos.ZERO))); + } + + @Test + public void testNegativeScale() { + final SerdeContext serdeContext = configuredSerdeContext(); + + final String json = toJson(serdeContext, FACTORY.createSqlType(SqlTypeName.DECIMAL, 5, -1)); + final RelDataType actual = toObject(serdeContext, json, RelDataType.class); + + assertThat(actual).isSameAs(FACTORY.createSqlType(SqlTypeName.DECIMAL, 6, 0)); + } + + // -------------------------------------------------------------------------------------------- + // Test data + // -------------------------------------------------------------------------------------------- + + @Parameters(name = "{0}") + public static List<RelDataType> testRelDataTypeSerde() { // the values in the list do not care about nullable. - List<RelDataType> types = + final List<RelDataType> types = Arrays.asList( FACTORY.createSqlType(SqlTypeName.BOOLEAN), FACTORY.createSqlType(SqlTypeName.TINYINT), FACTORY.createSqlType(SqlTypeName.SMALLINT), FACTORY.createSqlType(SqlTypeName.INTEGER), FACTORY.createSqlType(SqlTypeName.BIGINT), - FACTORY.createSqlType(SqlTypeName.DECIMAL, 3, 10), - FACTORY.createSqlType(SqlTypeName.DECIMAL, 0, 19), - FACTORY.createSqlType(SqlTypeName.DECIMAL, -1, 19), + FACTORY.createSqlType(SqlTypeName.DECIMAL, 10, 3), + FACTORY.createSqlType(SqlTypeName.DECIMAL, 19, 0), + FACTORY.createSqlType(SqlTypeName.DECIMAL, 38, 19), FACTORY.createSqlType(SqlTypeName.FLOAT), - FACTORY.createSqlType(SqlTypeName.REAL), FACTORY.createSqlType(SqlTypeName.DOUBLE), FACTORY.createSqlType(SqlTypeName.DATE), FACTORY.createSqlType(SqlTypeName.TIME), - FACTORY.createSqlType(SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE), FACTORY.createSqlType(SqlTypeName.TIMESTAMP), FACTORY.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE), FACTORY.createSqlIntervalType( new SqlIntervalQualifier( - TimeUnit.YEAR, TimeUnit.YEAR, SqlParserPos.ZERO)), - FACTORY.createSqlIntervalType( - new SqlIntervalQualifier( - TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)), - FACTORY.createSqlIntervalType( - new SqlIntervalQualifier( - TimeUnit.MONTH, TimeUnit.MONTH, SqlParserPos.ZERO)), - FACTORY.createSqlIntervalType( - new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)), + TimeUnit.DAY, + 2, + TimeUnit.MINUTE, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO)), FACTORY.createSqlIntervalType( new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.HOUR, SqlParserPos.ZERO)), + TimeUnit.DAY, 6, TimeUnit.SECOND, 9, SqlParserPos.ZERO)), FACTORY.createSqlIntervalType( new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.MINUTE, SqlParserPos.ZERO)), + TimeUnit.HOUR, + RelDataType.PRECISION_NOT_SPECIFIED, + TimeUnit.SECOND, + 9, + SqlParserPos.ZERO)), FACTORY.createSqlIntervalType( new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)), + TimeUnit.MINUTE, + RelDataType.PRECISION_NOT_SPECIFIED, + TimeUnit.SECOND, + 0, + SqlParserPos.ZERO)), FACTORY.createSqlIntervalType( new SqlIntervalQualifier( - TimeUnit.HOUR, TimeUnit.HOUR, SqlParserPos.ZERO)), - FACTORY.createSqlIntervalType( - new SqlIntervalQualifier( - TimeUnit.HOUR, TimeUnit.MINUTE, SqlParserPos.ZERO)), - FACTORY.createSqlIntervalType( - new SqlIntervalQualifier( - TimeUnit.HOUR, TimeUnit.SECOND, SqlParserPos.ZERO)), - FACTORY.createSqlIntervalType( - new SqlIntervalQualifier( - TimeUnit.MINUTE, TimeUnit.MINUTE, SqlParserPos.ZERO)), - FACTORY.createSqlIntervalType( - new SqlIntervalQualifier( - TimeUnit.MINUTE, TimeUnit.SECOND, SqlParserPos.ZERO)), - FACTORY.createSqlIntervalType( - new SqlIntervalQualifier( - TimeUnit.SECOND, TimeUnit.SECOND, SqlParserPos.ZERO)), + TimeUnit.SECOND, + RelDataType.PRECISION_NOT_SPECIFIED, + TimeUnit.SECOND, + 6, + SqlParserPos.ZERO)), FACTORY.createSqlType(SqlTypeName.CHAR), FACTORY.createSqlType(SqlTypeName.CHAR, 0), FACTORY.createSqlType(SqlTypeName.CHAR, 32), @@ -136,7 +165,6 @@ public class RelDataTypeJsonSerdeTest { FACTORY.createSqlType(SqlTypeName.VARBINARY, 0), FACTORY.createSqlType(SqlTypeName.VARBINARY, 1000), FACTORY.createSqlType(SqlTypeName.NULL), - FACTORY.createSqlType(SqlTypeName.ANY), FACTORY.createSqlType(SqlTypeName.SYMBOL), FACTORY.createMultisetType(FACTORY.createSqlType(SqlTypeName.VARCHAR), -1), FACTORY.createArrayType(FACTORY.createSqlType(SqlTypeName.VARCHAR, 16), -1), @@ -156,17 +184,16 @@ public class RelDataTypeJsonSerdeTest { FACTORY.createSqlType(SqlTypeName.INTEGER), FACTORY.createSqlType(SqlTypeName.VARCHAR, 10)), -1)), - FACTORY.createSqlType(SqlTypeName.DISTINCT), - FACTORY.createSqlType(SqlTypeName.STRUCTURED), // simple struct type FACTORY.createStructType( - StructKind.PEEK_FIELDS, + StructKind.PEEK_FIELDS_NO_EXPAND, Arrays.asList( FACTORY.createSqlType(SqlTypeName.INTEGER), - FACTORY.createSqlType(SqlTypeName.DECIMAL, 3, 10)), + FACTORY.createSqlType(SqlTypeName.DECIMAL, 10, 3)), Arrays.asList("f1", "f2")), // struct type with array type FACTORY.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, Arrays.asList( FACTORY.createSqlType(SqlTypeName.VARCHAR), FACTORY.createArrayType( @@ -175,8 +202,10 @@ public class RelDataTypeJsonSerdeTest { Arrays.asList("f1", "f2")), // nested struct type FACTORY.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, Arrays.asList( FACTORY.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, Arrays.asList( FACTORY.createSqlType( SqlTypeName.VARCHAR, 5), @@ -187,13 +216,10 @@ public class RelDataTypeJsonSerdeTest { FACTORY.createSqlType(SqlTypeName.VARCHAR, 16), -1)), Arrays.asList("f3", "f4")), - FACTORY.createSqlType(SqlTypeName.SARG), FACTORY.createRowtimeIndicatorType(true, false), FACTORY.createRowtimeIndicatorType(true, true), FACTORY.createProctimeIndicatorType(true), FACTORY.createFieldTypeFromLogicalType( - new LegacyTypeInformationType<>(LogicalTypeRoot.RAW, Types.STRING)), - FACTORY.createFieldTypeFromLogicalType( StructuredType.newBuilder( ObjectIdentifier.of("cat", "db", "structuredType"), DataTypeJsonSerdeTest.PojoClass.class) @@ -213,79 +239,28 @@ public class RelDataTypeJsonSerdeTest { .description("description for StructuredType") .build())); - List<RelDataType> ret = new ArrayList<>(types.size() * 2); + final List<RelDataType> mutableTypes = new ArrayList<>(types.size() * 2); for (RelDataType type : types) { - ret.add(FACTORY.createTypeWithNullability(type, true)); - ret.add(FACTORY.createTypeWithNullability(type, false)); + mutableTypes.add(FACTORY.createTypeWithNullability(type, true)); + mutableTypes.add(FACTORY.createTypeWithNullability(type, false)); } - ret.add( + mutableTypes.add( FACTORY.createTypeWithNullability( FACTORY.createFieldTypeFromLogicalType( new RawType<>(true, Void.class, VoidSerializer.INSTANCE)), true)); - ret.add( + mutableTypes.add( FACTORY.createTypeWithNullability( FACTORY.createFieldTypeFromLogicalType( new RawType<>(false, Void.class, VoidSerializer.INSTANCE)), false)); - ret.add( + mutableTypes.add( FACTORY.createTypeWithNullability( FACTORY.createFieldTypeFromLogicalType( new RawType<>(true, Void.class, VoidSerializer.INSTANCE)), false)); - ret.add( - FACTORY.createTypeWithNullability( - FACTORY.createFieldTypeFromLogicalType( - new TypeInformationRawType<>(true, Types.STRING)), - true)); - ret.add( - FACTORY.createTypeWithNullability( - FACTORY.createFieldTypeFromLogicalType( - new TypeInformationRawType<>(false, Types.STRING)), - false)); - ret.add( - FACTORY.createTypeWithNullability( - FACTORY.createFieldTypeFromLogicalType( - new TypeInformationRawType<>(true, Types.STRING)), - false)); - return ret; - } - - @Parameterized.Parameter public RelDataType relDataType; - - @Test - public void testTypeSerde() throws IOException { - SerdeContext serdeCtx = - new SerdeContext( - new FlinkContextImpl( - false, - TableConfig.getDefault(), - new ModuleManager(), - null, - CatalogManagerMocks.createEmptyCatalogManager(), - null), - Thread.currentThread().getContextClassLoader(), - FlinkTypeFactory.INSTANCE(), - FlinkSqlOperatorTable.instance()); - ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeCtx); - ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeCtx); - - final String json = objectWriter.writeValueAsString(relDataType); - - RelDataType actual = objectReader.readValue(json, RelDataType.class); - // type system will fill the default precision if the precision is not defined - if (relDataType.toString().equals("DECIMAL")) { - assertEquals(SqlTypeName.DECIMAL, actual.getSqlTypeName()); - assertEquals(relDataType.getScale(), actual.getScale()); - assertEquals( - serdeCtx.getTypeFactory() - .getTypeSystem() - .getDefaultPrecision(SqlTypeName.DECIMAL), - actual.getPrecision()); - } else { - assertSame(relDataType, actual); - } + return mutableTypes; } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java index becc4030..07a6ad2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java @@ -35,7 +35,6 @@ import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.utils.EncodingUtils; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; @@ -60,7 +59,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.StringWriter; import java.math.BigDecimal; import java.util.Arrays; import java.util.List; @@ -96,12 +94,12 @@ public class RexNodeSerdeTest { RexBuilder rexBuilder = new RexBuilder(FACTORY); RelDataType inputType = FACTORY.createStructType( - StructKind.PEEK_FIELDS, + StructKind.PEEK_FIELDS_NO_EXPAND, Arrays.asList( FACTORY.createSqlType(SqlTypeName.INTEGER), FACTORY.createSqlType(SqlTypeName.BIGINT), FACTORY.createStructType( - StructKind.PEEK_FIELDS, + StructKind.PEEK_FIELDS_NO_EXPAND, Arrays.asList( FACTORY.createSqlType(SqlTypeName.VARCHAR), FACTORY.createSqlType(SqlTypeName.VARCHAR)), @@ -133,100 +131,34 @@ public class RexNodeSerdeTest { FACTORY.createSqlType(SqlTypeName.FLOAT)), rexBuilder.makeExactLiteral(BigDecimal.valueOf(random.nextDouble())), rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.YEAR, TimeUnit.YEAR, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( BigDecimal.valueOf(100), new SqlIntervalQualifier( - TimeUnit.YEAR, TimeUnit.YEAR, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - BigDecimal.valueOf(3), - new SqlIntervalQualifier( - TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.MONTH, TimeUnit.MONTH, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - BigDecimal.valueOf(3), - new SqlIntervalQualifier( - TimeUnit.MONTH, TimeUnit.MONTH, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - BigDecimal.valueOf(3), - new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.HOUR, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - BigDecimal.valueOf(3), - new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.HOUR, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.MINUTE, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - BigDecimal.valueOf(3), - new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.MINUTE, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - BigDecimal.valueOf(3), - new SqlIntervalQualifier( - TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.HOUR, TimeUnit.HOUR, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - BigDecimal.valueOf(3), - new SqlIntervalQualifier( - TimeUnit.HOUR, TimeUnit.HOUR, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.HOUR, TimeUnit.MINUTE, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - BigDecimal.valueOf(3), - new SqlIntervalQualifier( - TimeUnit.HOUR, TimeUnit.MINUTE, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.HOUR, TimeUnit.SECOND, SqlParserPos.ZERO)), + TimeUnit.YEAR, + 4, + TimeUnit.YEAR, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO)), rexBuilder.makeIntervalLiteral( BigDecimal.valueOf(3), new SqlIntervalQualifier( - TimeUnit.HOUR, TimeUnit.SECOND, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.MINUTE, TimeUnit.MINUTE, SqlParserPos.ZERO)), + TimeUnit.YEAR, + 2, + TimeUnit.MONTH, + RelDataType.PRECISION_NOT_SPECIFIED, + SqlParserPos.ZERO)), rexBuilder.makeIntervalLiteral( BigDecimal.valueOf(3), new SqlIntervalQualifier( - TimeUnit.MINUTE, TimeUnit.MINUTE, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.MINUTE, TimeUnit.SECOND, SqlParserPos.ZERO)), + TimeUnit.DAY, 2, TimeUnit.SECOND, 6, SqlParserPos.ZERO)), rexBuilder.makeIntervalLiteral( BigDecimal.valueOf(3), new SqlIntervalQualifier( - TimeUnit.MINUTE, TimeUnit.SECOND, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - new SqlIntervalQualifier( - TimeUnit.SECOND, TimeUnit.SECOND, SqlParserPos.ZERO)), - rexBuilder.makeIntervalLiteral( - BigDecimal.valueOf(3), - new SqlIntervalQualifier( - TimeUnit.SECOND, TimeUnit.SECOND, SqlParserPos.ZERO)), + TimeUnit.SECOND, 2, TimeUnit.SECOND, 6, SqlParserPos.ZERO)), rexBuilder.makeDateLiteral(DateString.fromDaysSinceEpoch(10)), rexBuilder.makeDateLiteral(new DateString("2000-12-12")), rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(1234), 3), rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(123456), 6), rexBuilder.makeTimeLiteral(new TimeString("01:01:01.000000001"), 9), - rexBuilder.makeTimeWithLocalTimeZoneLiteral( - TimeString.fromMillisOfDay(1234), 3), rexBuilder.makeTimestampLiteral( TimestampString.fromMillisSinceEpoch(1234), 3), rexBuilder.makeTimestampLiteral( @@ -245,6 +177,7 @@ public class RexNodeSerdeTest { rexBuilder.makeLiteral( Arrays.<Object>asList(1, 2L), FACTORY.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, Arrays.asList( FACTORY.createSqlType(SqlTypeName.INTEGER), FACTORY.createSqlType(SqlTypeName.BIGINT)), @@ -373,11 +306,7 @@ public class RexNodeSerdeTest { ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeCtx); ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeCtx); - StringWriter writer = new StringWriter(100); - try (JsonGenerator gen = objectWriter.getFactory().createGenerator(writer)) { - gen.writeObject(rexNode); - } - String json = writer.toString(); + String json = objectWriter.writeValueAsString(rexNode); RexNode actual = objectReader.readValue(json, RexNode.class); assertEquals(rexNode, actual); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java index 2a04c58..1f69163 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java @@ -23,6 +23,7 @@ import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.planner.calcite.FlinkContextImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; @@ -48,7 +49,7 @@ public class RexWindowBoundSerdeTest { TableConfig.getDefault(), new ModuleManager(), null, - null, + CatalogManagerMocks.createEmptyCatalogManager(), null), Thread.currentThread().getContextClassLoader(), FlinkTypeFactory.INSTANCE(), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java index 9ad7670..ff56411 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java @@ -38,7 +38,6 @@ import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.planner.plan.stats.FlinkStatistic; import org.apache.flink.table.utils.CatalogManagerMocks; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; @@ -47,7 +46,6 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Test; import java.io.IOException; -import java.io.StringWriter; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -81,13 +79,9 @@ public class TemporalTableSourceSpecSerdeTest { ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeCtx); ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeCtx); - StringWriter writer = new StringWriter(100); List<TemporalTableSourceSpec> specs = testData(); for (TemporalTableSourceSpec spec : specs) { - try (JsonGenerator gen = objectWriter.getFactory().createGenerator(writer)) { - gen.writeObject(spec); - } - String json = writer.toString(); + String json = objectWriter.writeValueAsString(spec); TemporalTableSourceSpec actual = objectReader.readValue(json, TemporalTableSourceSpec.class); assertEquals(spec.getTableSourceSpec(), actual.getTableSourceSpec()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverterTest.java new file mode 100644 index 0000000..7ef17fe --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverterTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.typeutils; + +import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerdeTest; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DayTimeIntervalType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.SymbolType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; +import org.apache.flink.table.types.utils.DataTypeFactoryMock; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.runners.Parameterized.Parameters; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.stream.Stream; + +import static org.apache.flink.table.test.TableAssertions.assertThat; + +/** Tests for {@link LogicalRelDataTypeConverter}. */ +public class LogicalRelDataTypeConverterTest { + + @ParameterizedTest + @MethodSource("testConversion") + public void testConversion(LogicalType logicalType) throws IOException { + final RelDataTypeFactory typeFactory = FlinkTypeFactory.INSTANCE(); + final DataTypeFactoryMock dataTypeFactory = new DataTypeFactoryMock(); + final RelDataType relDataType = + LogicalRelDataTypeConverter.toRelDataType(logicalType, typeFactory); + assertThat(LogicalRelDataTypeConverter.toLogicalType(relDataType, dataTypeFactory)) + .isEqualTo(logicalType); + } + + // -------------------------------------------------------------------------------------------- + // Test data + // -------------------------------------------------------------------------------------------- + + @Parameters(name = "{0}") + private static Stream<LogicalType> testConversion() { + return Stream.of( + new BooleanType(), + new TinyIntType(), + new SmallIntType(), + new IntType(), + new BigIntType(), + new FloatType(), + new DoubleType(), + new DecimalType(10), + new DecimalType(15, 5), + CharType.ofEmptyLiteral(), + new CharType(), + new CharType(5), + VarCharType.ofEmptyLiteral(), + new VarCharType(), + new VarCharType(5), + BinaryType.ofEmptyLiteral(), + new BinaryType(), + new BinaryType(100), + VarBinaryType.ofEmptyLiteral(), + new VarBinaryType(), + new VarBinaryType(100), + new DateType(), + new TimeType(), + new TimeType(3), + new TimestampType(), + new TimestampType(3), + new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3), + new TimestampType(false, TimestampKind.ROWTIME, 3), + new LocalZonedTimestampType(), + new LocalZonedTimestampType(3), + new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3), + new LocalZonedTimestampType(false, TimestampKind.ROWTIME, 3), + new DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR), + new DayTimeIntervalType( + false, DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR, 3, 6), + new YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH), + new YearMonthIntervalType( + false, YearMonthIntervalType.YearMonthResolution.MONTH, 2), + new LocalZonedTimestampType(), + new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3), + new SymbolType<>(), + new ArrayType(new IntType(false)), + new ArrayType(new LocalZonedTimestampType(false, TimestampKind.ROWTIME, 3)), + new ArrayType(new TimestampType()), + new ArrayType(CharType.ofEmptyLiteral()), + new ArrayType(VarCharType.ofEmptyLiteral()), + new ArrayType(BinaryType.ofEmptyLiteral()), + new ArrayType(VarBinaryType.ofEmptyLiteral()), + new MapType(new BigIntType(), new IntType(false)), + new MapType( + new TimestampType(false, TimestampKind.ROWTIME, 3), + new LocalZonedTimestampType()), + new MapType(CharType.ofEmptyLiteral(), CharType.ofEmptyLiteral()), + new MapType(VarCharType.ofEmptyLiteral(), VarCharType.ofEmptyLiteral()), + new MapType(BinaryType.ofEmptyLiteral(), BinaryType.ofEmptyLiteral()), + new MapType(VarBinaryType.ofEmptyLiteral(), VarBinaryType.ofEmptyLiteral()), + new MultisetType(new IntType(false)), + new MultisetType(new TimestampType()), + new MultisetType(new TimestampType(true, TimestampKind.ROWTIME, 3)), + new MultisetType(CharType.ofEmptyLiteral()), + new MultisetType(VarCharType.ofEmptyLiteral()), + new MultisetType(BinaryType.ofEmptyLiteral()), + new MultisetType(VarBinaryType.ofEmptyLiteral()), + RowType.of(new BigIntType(), new IntType(false), new VarCharType(200)), + RowType.of( + new LogicalType[] { + new BigIntType(), new IntType(false), new VarCharType(200) + }, + new String[] {"f1", "f2", "f3"}), + RowType.of( + new TimestampType(false, TimestampKind.ROWTIME, 3), + new TimestampType(false, TimestampKind.REGULAR, 3), + new LocalZonedTimestampType(false, TimestampKind.ROWTIME, 3), + new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3), + new LocalZonedTimestampType(false, TimestampKind.REGULAR, 3)), + RowType.of( + CharType.ofEmptyLiteral(), + VarCharType.ofEmptyLiteral(), + BinaryType.ofEmptyLiteral(), + VarBinaryType.ofEmptyLiteral()), + // registered structured type + StructuredType.newBuilder( + ObjectIdentifier.of("cat", "db", "structuredType"), + DataTypeJsonSerdeTest.PojoClass.class) + .attributes( + Arrays.asList( + new StructuredType.StructuredAttribute( + "f0", new IntType(true)), + new StructuredType.StructuredAttribute( + "f1", new BigIntType(true)), + new StructuredType.StructuredAttribute( + "f2", new VarCharType(200), "desc"))) + .comparison(StructuredType.StructuredComparison.FULL) + .setFinal(false) + .setInstantiable(false) + .superType( + StructuredType.newBuilder( + ObjectIdentifier.of("cat", "db", "structuredType2")) + .attributes( + Collections.singletonList( + new StructuredType.StructuredAttribute( + "f0", new BigIntType(false)))) + .build()) + .description("description for StructuredType") + .build(), + // unregistered structured type + StructuredType.newBuilder(PojoClass.class) + .attributes( + Arrays.asList( + new StructuredType.StructuredAttribute( + "f0", new IntType(true)), + new StructuredType.StructuredAttribute( + "f1", new BigIntType(true)), + new StructuredType.StructuredAttribute( + "f2", new VarCharType(200), "desc"))) + .build(), + // custom RawType + new RawType<>(LocalDateTime.class, LocalDateTimeSerializer.INSTANCE)); + } + + /** Testing class. */ + public static class PojoClass { + public int f0; + public long f1; + public String f2; + } +}