twalthr commented on a change in pull request #18274: URL: https://github.com/apache/flink/pull/18274#discussion_r780140261
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LogicalTypeDataTypeConverter.java ########## @@ -201,9 +200,29 @@ public DataType visit(RowType rowType) { @Override public DataType visit(DistinctType distinctType) { - return new FieldsDataType( - distinctType, - Collections.singletonList(distinctType.getSourceType().accept(this))); + final DataType sourceDataType = distinctType.getSourceType().accept(this); + if (sourceDataType instanceof AtomicDataType) { + return new AtomicDataType(distinctType, sourceDataType.getConversionClass()); + } else if (sourceDataType instanceof CollectionDataType) { + final CollectionDataType collectionDataType = (CollectionDataType) sourceDataType; + return new CollectionDataType( + distinctType, + collectionDataType.getConversionClass(), + collectionDataType.getElementDataType()); + } else if (sourceDataType instanceof KeyValueDataType) { + final KeyValueDataType keyValueDataType = (KeyValueDataType) sourceDataType; + return new KeyValueDataType( + distinctType, + keyValueDataType.getConversionClass(), + keyValueDataType.getKeyDataType(), + keyValueDataType.getValueDataType()); + } else if (sourceDataType instanceof FieldsDataType) { + return new FieldsDataType( + distinctType, + sourceDataType.getConversionClass(), + sourceDataType.getChildren()); + } + throw new IllegalStateException("Unexpected data type instance."); Review comment: The outer data type needs to preserve the original `distinctType` LogicalType. However, distinct types should behave similar as the source type when it comes to accessing elements and fields. Therefore, they also need to have a similar DataType wrapper class such as `FieldsDataType` or `KeyValueDataType`. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java ########## @@ -109,349 +123,358 @@ public void serialize( JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { - if (logicalType instanceof CharType) { - // Zero-length character strings have no serializable string representation. - serializeRowType((CharType) logicalType, jsonGenerator); - } else if (logicalType instanceof VarCharType) { - // Zero-length character strings have no serializable string representation. - serializeVarCharType((VarCharType) logicalType, jsonGenerator); - } else if (logicalType instanceof BinaryType) { - // Zero-length binary strings have no serializable string representation. - serializeBinaryType((BinaryType) logicalType, jsonGenerator); - } else if (logicalType instanceof VarBinaryType) { - // Zero-length binary strings have no serializable string representation. - serializeVarBinaryType((VarBinaryType) logicalType, jsonGenerator); - } else if (logicalType instanceof SymbolType) { - // SymbolType does not support `asSerializableString` - serializeSymbolType((SymbolType<?>) logicalType, jsonGenerator); - } else if (logicalType instanceof TypeInformationRawType) { - // TypeInformationRawType does not support `asSerializableString` - serializeTypeInformationRawType((TypeInformationRawType<?>) logicalType, jsonGenerator); - } else if (logicalType instanceof StructuredType) { - // StructuredType does not full support `asSerializableString` - serializeStructuredType((StructuredType) logicalType, jsonGenerator); - } else if (logicalType instanceof DistinctType) { - // DistinctType does not full support `asSerializableString` - serializeDistinctType((DistinctType) logicalType, jsonGenerator); - } else if (logicalType instanceof TimestampType) { - // TimestampType does not consider `TimestampKind` - serializeTimestampType((TimestampType) logicalType, jsonGenerator); - } else if (logicalType instanceof ZonedTimestampType) { - // ZonedTimestampType does not consider `TimestampKind` - serializeZonedTimestampType((ZonedTimestampType) logicalType, jsonGenerator); - } else if (logicalType instanceof LocalZonedTimestampType) { - // LocalZonedTimestampType does not consider `TimestampKind` - serializeLocalZonedTimestampType((LocalZonedTimestampType) logicalType, jsonGenerator); - } else if (logicalType instanceof RowType) { - serializeRowType((RowType) logicalType, jsonGenerator, serializerProvider); - } else if (logicalType instanceof MapType) { - serializeMapType((MapType) logicalType, jsonGenerator, serializerProvider); - } else if (logicalType instanceof ArrayType) { - serializeArrayType((ArrayType) logicalType, jsonGenerator, serializerProvider); - } else if (logicalType instanceof MultisetType) { - serializeMultisetType((MultisetType) logicalType, jsonGenerator, serializerProvider); - } else if (logicalType instanceof RawType) { - serializeRawType((RawType<?>) logicalType, jsonGenerator, serializerProvider); - } else if (logicalType instanceof UnresolvedUserDefinedType) { - throw new TableException( - "Can not serialize an UnresolvedUserDefinedType instance. \n" - + "It needs to be resolved into a proper user-defined type.\""); - } else { - jsonGenerator.writeObject(logicalType.asSerializableString()); - } + final ReadableConfig config = SerdeContext.from(serializerProvider).getConfiguration(); + final boolean serializeCatalogObjects = + !config.get(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS) + .equals(CatalogPlanCompilation.IDENTIFIER); + serializeInternal(logicalType, jsonGenerator, serializeCatalogObjects); } - private void serializeRowType( - RowType rowType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + private static void serializeInternal( + LogicalType logicalType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, rowType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rowType.isNullable()); - List<RowType.RowField> fields = rowType.getFields(); - jsonGenerator.writeArrayFieldStart(FIELD_NAME_FIELDS); - for (RowType.RowField rowField : fields) { - jsonGenerator.writeStartObject(); - jsonGenerator.writeFieldName(rowField.getName()); - serialize(rowField.getType(), jsonGenerator, serializerProvider); - if (rowField.getDescription().isPresent()) { - jsonGenerator.writeStringField( - FIELD_NAME_DESCRIPTION, rowField.getDescription().get()); - } - jsonGenerator.writeEndObject(); + if (supportsCompactSerialization(logicalType, serializeCatalogObjects)) { + serializeTypeWithCompactSerialization(logicalType, jsonGenerator); + } else { + // fallback to generic serialization that might still use compact serialization for + // individual fields + serializeTypeWithGenericSerialization( + logicalType, jsonGenerator, serializeCatalogObjects); } - jsonGenerator.writeEndArray(); - jsonGenerator.writeEndObject(); } - private void serializeMapType( - MapType mapType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + // -------------------------------------------------------------------------------------------- + // Generic Serialization + // -------------------------------------------------------------------------------------------- + + private static void serializeTypeWithGenericSerialization( + LogicalType logicalType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, mapType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, mapType.isNullable()); - jsonGenerator.writeFieldName(FIELD_NAME_KEY_TYPE); - serialize(mapType.getKeyType(), jsonGenerator, serializerProvider); - jsonGenerator.writeFieldName(FIELD_NAME_VALUE_TYPE); - serialize(mapType.getValueType(), jsonGenerator, serializerProvider); + + jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, logicalType.getTypeRoot().name()); + if (!logicalType.isNullable()) { + jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, false); + } + + switch (logicalType.getTypeRoot()) { + case CHAR: + case VARCHAR: + case BINARY: + case VARBINARY: + serializeZeroLengthString(jsonGenerator); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + final TimestampType timestampType = (TimestampType) logicalType; + serializeTimestamp( + timestampType.getPrecision(), timestampType.getKind(), jsonGenerator); + break; + case TIMESTAMP_WITH_TIME_ZONE: + final ZonedTimestampType zonedTimestampType = (ZonedTimestampType) logicalType; + serializeTimestamp( + zonedTimestampType.getPrecision(), + zonedTimestampType.getKind(), + jsonGenerator); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final LocalZonedTimestampType localZonedTimestampType = + (LocalZonedTimestampType) logicalType; + serializeTimestamp( + localZonedTimestampType.getPrecision(), + localZonedTimestampType.getKind(), + jsonGenerator); + break; + case ARRAY: + serializeCollection( + ((ArrayType) logicalType).getElementType(), + jsonGenerator, + serializeCatalogObjects); + break; + case MULTISET: + serializeCollection( + ((MultisetType) logicalType).getElementType(), + jsonGenerator, + serializeCatalogObjects); + break; + case MAP: + serializeMap((MapType) logicalType, jsonGenerator, serializeCatalogObjects); + break; + case ROW: + serializeRow((RowType) logicalType, jsonGenerator, serializeCatalogObjects); + break; + case DISTINCT_TYPE: + serializeDistinctType( + (DistinctType) logicalType, jsonGenerator, serializeCatalogObjects); + break; + case STRUCTURED_TYPE: + serializeStructuredType( + (StructuredType) logicalType, jsonGenerator, serializeCatalogObjects); + break; + case SYMBOL: + // type root is enough + break; + case RAW: + if (logicalType instanceof RawType) { + serializeSpecializedRaw((RawType<?>) logicalType, jsonGenerator); + break; + } + // fall through + default: + throw new ValidationException( + String.format( + "Unable to serialize logical type '%s'. Please check the documentation for supported types.", + logicalType.asSummaryString())); + } + jsonGenerator.writeEndObject(); } - private void serializeArrayType( - ArrayType arrayType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) - throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, arrayType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, arrayType.isNullable()); - jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE); - serialize(arrayType.getElementType(), jsonGenerator, serializerProvider); - jsonGenerator.writeEndObject(); + private static void serializeZeroLengthString(JsonGenerator jsonGenerator) throws IOException { + jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0); } - private void serializeMultisetType( - MultisetType multisetType, - JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) - throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, multisetType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, multisetType.isNullable()); - jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE); - serialize(multisetType.getElementType(), jsonGenerator, serializerProvider); - jsonGenerator.writeEndObject(); + private static void serializeTimestamp( + int precision, TimestampKind kind, JsonGenerator jsonGenerator) throws IOException { + jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, precision); + jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, kind); } - private void serializeRowType(CharType charType, JsonGenerator jsonGenerator) + private static void serializeCollection( + LogicalType elementType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { - // Zero-length character strings have no serializable string representation. - if (charType.getLength() == CharType.EMPTY_LITERAL_LENGTH) { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, charType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, charType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0); - jsonGenerator.writeEndObject(); - } else { - jsonGenerator.writeObject(charType.asSerializableString()); - } + jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE); + serializeInternal(elementType, jsonGenerator, serializeCatalogObjects); } - private void serializeVarCharType(VarCharType varCharType, JsonGenerator jsonGenerator) + private static void serializeMap( + MapType mapType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { - // Zero-length character strings have no serializable string representation. - if (varCharType.getLength() == VarCharType.EMPTY_LITERAL_LENGTH) { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, varCharType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, varCharType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0); - jsonGenerator.writeEndObject(); - } else { - jsonGenerator.writeObject(varCharType.asSerializableString()); - } + jsonGenerator.writeFieldName(FIELD_NAME_KEY_TYPE); + serializeInternal(mapType.getKeyType(), jsonGenerator, serializeCatalogObjects); + jsonGenerator.writeFieldName(FIELD_NAME_VALUE_TYPE); + serializeInternal(mapType.getValueType(), jsonGenerator, serializeCatalogObjects); } - private void serializeBinaryType(BinaryType binaryType, JsonGenerator jsonGenerator) + private static void serializeRow( + RowType rowType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { - // Zero-length binary strings have no serializable string representation. - if (binaryType.getLength() == BinaryType.EMPTY_LITERAL_LENGTH) { + jsonGenerator.writeArrayFieldStart(FIELD_NAME_FIELDS); + for (RowType.RowField rowField : rowType.getFields()) { jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, binaryType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, binaryType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0); + jsonGenerator.writeStringField(FIELD_NAME_FIELD_NAME, rowField.getName()); + jsonGenerator.writeFieldName(FIELD_NAME_FIELD_TYPE); + serializeInternal(rowField.getType(), jsonGenerator, serializeCatalogObjects); + if (rowField.getDescription().isPresent()) { + jsonGenerator.writeStringField( + FIELD_NAME_FIELD_DESCRIPTION, rowField.getDescription().get()); + } jsonGenerator.writeEndObject(); - } else { - jsonGenerator.writeObject(binaryType.asSerializableString()); } + jsonGenerator.writeEndArray(); } - private void serializeVarBinaryType(VarBinaryType varBinaryType, JsonGenerator jsonGenerator) + private static void serializeDistinctType( + DistinctType distinctType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { - // Zero-length binary strings have no serializable string representation. - if (varBinaryType.getLength() == VarBinaryType.EMPTY_LITERAL_LENGTH) { - jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField( + FIELD_NAME_OBJECT_IDENTIFIER, + distinctType.getObjectIdentifier().orElseThrow(IllegalStateException::new)); + if (distinctType.getDescription().isPresent()) { jsonGenerator.writeStringField( - FIELD_NAME_TYPE_NAME, varBinaryType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, varBinaryType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0); - jsonGenerator.writeEndObject(); - } else { - jsonGenerator.writeObject(varBinaryType.asSerializableString()); + FIELD_NAME_FIELD_DESCRIPTION, distinctType.getDescription().get()); } + jsonGenerator.writeFieldName(FIELD_NAME_SOURCE_TYPE); + serializeInternal(distinctType.getSourceType(), jsonGenerator, serializeCatalogObjects); } - private void serializeSymbolType(SymbolType<?> symbolType, JsonGenerator jsonGenerator) - throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, symbolType.isNullable()); - jsonGenerator.writeStringField( - FIELD_NAME_SYMBOL_CLASS, symbolType.getDefaultConversion().getName()); - jsonGenerator.writeEndObject(); - } - - private void serializeTypeInformationRawType( - TypeInformationRawType<?> rawType, JsonGenerator jsonGenerator) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable()); - jsonGenerator.writeStringField( - FIELD_NAME_TYPE_INFO, - EncodingUtils.encodeObjectToString(rawType.getTypeInformation())); - jsonGenerator.writeEndObject(); - } - - private void serializeStructuredType(StructuredType structuredType, JsonGenerator jsonGenerator) + private static void serializeStructuredType( + StructuredType structuredType, + JsonGenerator jsonGenerator, + boolean serializeCatalogObjects) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField( - FIELD_NAME_TYPE_NAME, LogicalTypeRoot.STRUCTURED_TYPE.name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, structuredType.isNullable()); if (structuredType.getObjectIdentifier().isPresent()) { jsonGenerator.writeObjectField( - FIELD_NAME_IDENTIFIER, structuredType.getObjectIdentifier().get()); + FIELD_NAME_OBJECT_IDENTIFIER, structuredType.getObjectIdentifier().get()); } - if (structuredType.getImplementationClass().isPresent()) { + if (structuredType.getDescription().isPresent()) { jsonGenerator.writeStringField( - FIELD_NAME_IMPLEMENTATION_CLASS, - structuredType.getImplementationClass().get().getName()); + FIELD_NAME_DESCRIPTION, structuredType.getDescription().get()); + } + if (structuredType.getImplementationClass().isPresent()) { + jsonGenerator.writeObjectField( + FIELD_NAME_IMPLEMENTATION_CLASS, structuredType.getImplementationClass().get()); } jsonGenerator.writeFieldName(FIELD_NAME_ATTRIBUTES); jsonGenerator.writeStartArray(); - for (StructuredType.StructuredAttribute attribute : structuredType.getAttributes()) { + for (StructuredAttribute attribute : structuredType.getAttributes()) { jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_NAME, attribute.getName()); - jsonGenerator.writeObjectField(FIELD_NAME_LOGICAL_TYPE, attribute.getType()); + jsonGenerator.writeStringField(FIELD_NAME_ATTRIBUTE_NAME, attribute.getName()); + jsonGenerator.writeFieldName(FIELD_NAME_ATTRIBUTE_TYPE); + serializeInternal(attribute.getType(), jsonGenerator, serializeCatalogObjects); if (attribute.getDescription().isPresent()) { jsonGenerator.writeStringField( - FIELD_NAME_DESCRIPTION, attribute.getDescription().get()); + FIELD_NAME_ATTRIBUTE_DESCRIPTION, attribute.getDescription().get()); } jsonGenerator.writeEndObject(); } jsonGenerator.writeEndArray(); - jsonGenerator.writeBooleanField(FIELD_NAME_FINAL, structuredType.isFinal()); - jsonGenerator.writeBooleanField(FIELD_NAME_INSTANTIABLE, structuredType.isInstantiable()); - jsonGenerator.writeStringField( - FIELD_NAME_COMPARISON, structuredType.getComparison().name()); - if (structuredType.getSuperType().isPresent()) { - jsonGenerator.writeObjectField( - FIELD_NAME_SUPPER_TYPE, structuredType.getSuperType().get()); + if (!structuredType.isFinal()) { + jsonGenerator.writeBooleanField(FIELD_NAME_FINAL, false); } - if (structuredType.getDescription().isPresent()) { + if (!structuredType.isInstantiable()) { + jsonGenerator.writeBooleanField(FIELD_NAME_INSTANTIABLE, false); + } + if (structuredType.getComparison() != StructuredComparison.NONE) { jsonGenerator.writeStringField( - FIELD_NAME_DESCRIPTION, structuredType.getDescription().get()); + FIELD_NAME_COMPARISON, structuredType.getComparison().name()); + } + if (structuredType.getSuperType().isPresent()) { + jsonGenerator.writeObjectField( + FIELD_NAME_SUPER_TYPE, structuredType.getSuperType().get()); } - jsonGenerator.writeEndObject(); } - private void serializeDistinctType(DistinctType distinctType, JsonGenerator jsonGenerator) + private static void serializeSpecializedRaw(RawType<?> rawType, JsonGenerator jsonGenerator) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, LogicalTypeRoot.DISTINCT_TYPE.name()); - Preconditions.checkArgument(distinctType.getObjectIdentifier().isPresent()); - jsonGenerator.writeObjectField( - FIELD_NAME_IDENTIFIER, distinctType.getObjectIdentifier().get()); - jsonGenerator.writeObjectField(FIELD_NAME_SOURCE_TYPE, distinctType.getSourceType()); - if (distinctType.getDescription().isPresent()) { + jsonGenerator.writeStringField(FIELD_NAME_CLASS, rawType.getOriginatingClass().getName()); + final TypeSerializer<?> serializer = rawType.getTypeSerializer(); + if (serializer.equals(NullSerializer.INSTANCE)) { jsonGenerator.writeStringField( - FIELD_NAME_DESCRIPTION, distinctType.getDescription().get()); + FIELD_NAME_SPECIAL_SERIALIZER, FIELD_VALUE_EXTERNAL_SERIALIZER_NULL); + } else if (serializer instanceof ExternalSerializer) { + final ExternalSerializer<?, ?> externalSerializer = + (ExternalSerializer<?, ?>) rawType.getTypeSerializer(); + if (externalSerializer.isInternalInput()) { + throw new TableException( + "Asymmetric external serializers are currently not supported. " + + "The input must not be internal if the output is external."); + } + jsonGenerator.writeObjectField( + FIELD_NAME_EXTERNAL_DATA_TYPE, externalSerializer.getDataType()); + } else { + throw new TableException("Unsupported special case for RAW type."); } - jsonGenerator.writeEndObject(); } - private void serializeTimestampType(TimestampType timestampType, JsonGenerator jsonGenerator) - throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision()); - jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind()); - jsonGenerator.writeEndObject(); - } + // -------------------------------------------------------------------------------------------- + // Compact Serialization + // -------------------------------------------------------------------------------------------- - private void serializeZonedTimestampType( - ZonedTimestampType timestampType, JsonGenerator jsonGenerator) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision()); - jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind()); - jsonGenerator.writeEndObject(); + private static boolean supportsCompactSerialization( + LogicalType logicalType, boolean serializeCatalogObjects) { + return logicalType.accept(new CompactSerializationChecker(serializeCatalogObjects)); } - private void serializeLocalZonedTimestampType( - LocalZonedTimestampType timestampType, JsonGenerator jsonGenerator) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision()); - jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind()); - jsonGenerator.writeEndObject(); + private static void serializeTypeWithCompactSerialization( + LogicalType logicalType, JsonGenerator jsonGenerator) throws IOException { + final String compactString = logicalType.asSerializableString(); + jsonGenerator.writeString(compactString); } - @SuppressWarnings("rawtypes") - private void serializeRawType( - RawType<?> rawType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) - throws IOException { - TypeSerializer<?> typeSer = rawType.getTypeSerializer(); - if (typeSer instanceof ExternalSerializer) { - ExternalSerializer externalSer = (ExternalSerializer) typeSer; - // Currently, ExternalSerializer with `isInternalInput=false` will be serialized, - // Once `isInternalInput=true` needs to be serialized, we can add individual field in - // the json to support it, and the new json plan is compatible with the previous one. - if (externalSer.isInternalInput()) { - throw new TableException( - "ExternalSerializer with `isInternalInput=true` is not supported."); - } - DataType dataType = externalSer.getDataType(); - boolean isMapView = DataViewUtils.isMapViewDataType(dataType); - boolean isListView = DataViewUtils.isListViewDataType(dataType); - if (isMapView || isListView) { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, LogicalTypeRoot.RAW.name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable()); - if (isMapView) { - jsonGenerator.writeStringField( - FIELD_NAME_DATA_VIEW_CLASS, MapView.class.getName()); - KeyValueDataType keyValueDataType = - DataViewUtils.extractKeyValueDataTypeForMapView(dataType); - serializeDataTypeForDataView( - FIELD_NAME_KEY_TYPE, - keyValueDataType.getKeyDataType(), - jsonGenerator, - serializerProvider); - serializeDataTypeForDataView( - FIELD_NAME_VALUE_TYPE, - keyValueDataType.getValueDataType(), - jsonGenerator, - serializerProvider); - } else { - jsonGenerator.writeStringField( - FIELD_NAME_DATA_VIEW_CLASS, ListView.class.getName()); - DataType elementType = - DataViewUtils.extractElementDataTypeForListView(dataType); - serializeDataTypeForDataView( - FIELD_NAME_ELEMENT_TYPE, - elementType, - jsonGenerator, - serializerProvider); - } - jsonGenerator.writeEndObject(); - return; - } + /** + * Checks whether the given type can be serialized as a compact string created from {@link + * LogicalType#asSerializableString()}. + */ + private static class CompactSerializationChecker extends LogicalTypeDefaultVisitor<Boolean> { + + private final boolean serializeCatalogObjects; + + CompactSerializationChecker(boolean serializeCatalogObjects) { + this.serializeCatalogObjects = serializeCatalogObjects; } - jsonGenerator.writeObject(rawType.asSerializableString()); - } + @Override + public Boolean visit(CharType charType) { + return charType.getLength() > 0; Review comment: Unfortunately not. It is invalid in declarations but valid within a plan if you have something like `SELECT ""`. ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java ########## @@ -109,349 +123,358 @@ public void serialize( JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { - if (logicalType instanceof CharType) { - // Zero-length character strings have no serializable string representation. - serializeRowType((CharType) logicalType, jsonGenerator); - } else if (logicalType instanceof VarCharType) { - // Zero-length character strings have no serializable string representation. - serializeVarCharType((VarCharType) logicalType, jsonGenerator); - } else if (logicalType instanceof BinaryType) { - // Zero-length binary strings have no serializable string representation. - serializeBinaryType((BinaryType) logicalType, jsonGenerator); - } else if (logicalType instanceof VarBinaryType) { - // Zero-length binary strings have no serializable string representation. - serializeVarBinaryType((VarBinaryType) logicalType, jsonGenerator); - } else if (logicalType instanceof SymbolType) { - // SymbolType does not support `asSerializableString` - serializeSymbolType((SymbolType<?>) logicalType, jsonGenerator); - } else if (logicalType instanceof TypeInformationRawType) { - // TypeInformationRawType does not support `asSerializableString` - serializeTypeInformationRawType((TypeInformationRawType<?>) logicalType, jsonGenerator); - } else if (logicalType instanceof StructuredType) { - // StructuredType does not full support `asSerializableString` - serializeStructuredType((StructuredType) logicalType, jsonGenerator); - } else if (logicalType instanceof DistinctType) { - // DistinctType does not full support `asSerializableString` - serializeDistinctType((DistinctType) logicalType, jsonGenerator); - } else if (logicalType instanceof TimestampType) { - // TimestampType does not consider `TimestampKind` - serializeTimestampType((TimestampType) logicalType, jsonGenerator); - } else if (logicalType instanceof ZonedTimestampType) { - // ZonedTimestampType does not consider `TimestampKind` - serializeZonedTimestampType((ZonedTimestampType) logicalType, jsonGenerator); - } else if (logicalType instanceof LocalZonedTimestampType) { - // LocalZonedTimestampType does not consider `TimestampKind` - serializeLocalZonedTimestampType((LocalZonedTimestampType) logicalType, jsonGenerator); - } else if (logicalType instanceof RowType) { - serializeRowType((RowType) logicalType, jsonGenerator, serializerProvider); - } else if (logicalType instanceof MapType) { - serializeMapType((MapType) logicalType, jsonGenerator, serializerProvider); - } else if (logicalType instanceof ArrayType) { - serializeArrayType((ArrayType) logicalType, jsonGenerator, serializerProvider); - } else if (logicalType instanceof MultisetType) { - serializeMultisetType((MultisetType) logicalType, jsonGenerator, serializerProvider); - } else if (logicalType instanceof RawType) { - serializeRawType((RawType<?>) logicalType, jsonGenerator, serializerProvider); - } else if (logicalType instanceof UnresolvedUserDefinedType) { - throw new TableException( - "Can not serialize an UnresolvedUserDefinedType instance. \n" - + "It needs to be resolved into a proper user-defined type.\""); - } else { - jsonGenerator.writeObject(logicalType.asSerializableString()); - } + final ReadableConfig config = SerdeContext.from(serializerProvider).getConfiguration(); + final boolean serializeCatalogObjects = + !config.get(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS) + .equals(CatalogPlanCompilation.IDENTIFIER); + serializeInternal(logicalType, jsonGenerator, serializeCatalogObjects); } - private void serializeRowType( - RowType rowType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + private static void serializeInternal( + LogicalType logicalType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, rowType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rowType.isNullable()); - List<RowType.RowField> fields = rowType.getFields(); - jsonGenerator.writeArrayFieldStart(FIELD_NAME_FIELDS); - for (RowType.RowField rowField : fields) { - jsonGenerator.writeStartObject(); - jsonGenerator.writeFieldName(rowField.getName()); - serialize(rowField.getType(), jsonGenerator, serializerProvider); - if (rowField.getDescription().isPresent()) { - jsonGenerator.writeStringField( - FIELD_NAME_DESCRIPTION, rowField.getDescription().get()); - } - jsonGenerator.writeEndObject(); + if (supportsCompactSerialization(logicalType, serializeCatalogObjects)) { + serializeTypeWithCompactSerialization(logicalType, jsonGenerator); + } else { + // fallback to generic serialization that might still use compact serialization for + // individual fields + serializeTypeWithGenericSerialization( + logicalType, jsonGenerator, serializeCatalogObjects); } - jsonGenerator.writeEndArray(); - jsonGenerator.writeEndObject(); } - private void serializeMapType( - MapType mapType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) + // -------------------------------------------------------------------------------------------- + // Generic Serialization + // -------------------------------------------------------------------------------------------- + + private static void serializeTypeWithGenericSerialization( + LogicalType logicalType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, mapType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, mapType.isNullable()); - jsonGenerator.writeFieldName(FIELD_NAME_KEY_TYPE); - serialize(mapType.getKeyType(), jsonGenerator, serializerProvider); - jsonGenerator.writeFieldName(FIELD_NAME_VALUE_TYPE); - serialize(mapType.getValueType(), jsonGenerator, serializerProvider); + + jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, logicalType.getTypeRoot().name()); + if (!logicalType.isNullable()) { + jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, false); + } + + switch (logicalType.getTypeRoot()) { + case CHAR: + case VARCHAR: + case BINARY: + case VARBINARY: + serializeZeroLengthString(jsonGenerator); + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + final TimestampType timestampType = (TimestampType) logicalType; + serializeTimestamp( + timestampType.getPrecision(), timestampType.getKind(), jsonGenerator); + break; + case TIMESTAMP_WITH_TIME_ZONE: + final ZonedTimestampType zonedTimestampType = (ZonedTimestampType) logicalType; + serializeTimestamp( + zonedTimestampType.getPrecision(), + zonedTimestampType.getKind(), + jsonGenerator); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final LocalZonedTimestampType localZonedTimestampType = + (LocalZonedTimestampType) logicalType; + serializeTimestamp( + localZonedTimestampType.getPrecision(), + localZonedTimestampType.getKind(), + jsonGenerator); + break; + case ARRAY: + serializeCollection( + ((ArrayType) logicalType).getElementType(), + jsonGenerator, + serializeCatalogObjects); + break; + case MULTISET: + serializeCollection( + ((MultisetType) logicalType).getElementType(), + jsonGenerator, + serializeCatalogObjects); + break; + case MAP: + serializeMap((MapType) logicalType, jsonGenerator, serializeCatalogObjects); + break; + case ROW: + serializeRow((RowType) logicalType, jsonGenerator, serializeCatalogObjects); + break; + case DISTINCT_TYPE: + serializeDistinctType( + (DistinctType) logicalType, jsonGenerator, serializeCatalogObjects); + break; + case STRUCTURED_TYPE: + serializeStructuredType( + (StructuredType) logicalType, jsonGenerator, serializeCatalogObjects); + break; + case SYMBOL: + // type root is enough + break; + case RAW: + if (logicalType instanceof RawType) { + serializeSpecializedRaw((RawType<?>) logicalType, jsonGenerator); + break; + } + // fall through + default: + throw new ValidationException( + String.format( + "Unable to serialize logical type '%s'. Please check the documentation for supported types.", + logicalType.asSummaryString())); + } + jsonGenerator.writeEndObject(); } - private void serializeArrayType( - ArrayType arrayType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) - throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, arrayType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, arrayType.isNullable()); - jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE); - serialize(arrayType.getElementType(), jsonGenerator, serializerProvider); - jsonGenerator.writeEndObject(); + private static void serializeZeroLengthString(JsonGenerator jsonGenerator) throws IOException { + jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0); } - private void serializeMultisetType( - MultisetType multisetType, - JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) - throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, multisetType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, multisetType.isNullable()); - jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE); - serialize(multisetType.getElementType(), jsonGenerator, serializerProvider); - jsonGenerator.writeEndObject(); + private static void serializeTimestamp( + int precision, TimestampKind kind, JsonGenerator jsonGenerator) throws IOException { + jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, precision); + jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, kind); } - private void serializeRowType(CharType charType, JsonGenerator jsonGenerator) + private static void serializeCollection( + LogicalType elementType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { - // Zero-length character strings have no serializable string representation. - if (charType.getLength() == CharType.EMPTY_LITERAL_LENGTH) { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, charType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, charType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0); - jsonGenerator.writeEndObject(); - } else { - jsonGenerator.writeObject(charType.asSerializableString()); - } + jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE); + serializeInternal(elementType, jsonGenerator, serializeCatalogObjects); } - private void serializeVarCharType(VarCharType varCharType, JsonGenerator jsonGenerator) + private static void serializeMap( + MapType mapType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { - // Zero-length character strings have no serializable string representation. - if (varCharType.getLength() == VarCharType.EMPTY_LITERAL_LENGTH) { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, varCharType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, varCharType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0); - jsonGenerator.writeEndObject(); - } else { - jsonGenerator.writeObject(varCharType.asSerializableString()); - } + jsonGenerator.writeFieldName(FIELD_NAME_KEY_TYPE); + serializeInternal(mapType.getKeyType(), jsonGenerator, serializeCatalogObjects); + jsonGenerator.writeFieldName(FIELD_NAME_VALUE_TYPE); + serializeInternal(mapType.getValueType(), jsonGenerator, serializeCatalogObjects); } - private void serializeBinaryType(BinaryType binaryType, JsonGenerator jsonGenerator) + private static void serializeRow( + RowType rowType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { - // Zero-length binary strings have no serializable string representation. - if (binaryType.getLength() == BinaryType.EMPTY_LITERAL_LENGTH) { + jsonGenerator.writeArrayFieldStart(FIELD_NAME_FIELDS); + for (RowType.RowField rowField : rowType.getFields()) { jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, binaryType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, binaryType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0); + jsonGenerator.writeStringField(FIELD_NAME_FIELD_NAME, rowField.getName()); + jsonGenerator.writeFieldName(FIELD_NAME_FIELD_TYPE); + serializeInternal(rowField.getType(), jsonGenerator, serializeCatalogObjects); + if (rowField.getDescription().isPresent()) { + jsonGenerator.writeStringField( + FIELD_NAME_FIELD_DESCRIPTION, rowField.getDescription().get()); + } jsonGenerator.writeEndObject(); - } else { - jsonGenerator.writeObject(binaryType.asSerializableString()); } + jsonGenerator.writeEndArray(); } - private void serializeVarBinaryType(VarBinaryType varBinaryType, JsonGenerator jsonGenerator) + private static void serializeDistinctType( + DistinctType distinctType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects) throws IOException { - // Zero-length binary strings have no serializable string representation. - if (varBinaryType.getLength() == VarBinaryType.EMPTY_LITERAL_LENGTH) { - jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField( + FIELD_NAME_OBJECT_IDENTIFIER, + distinctType.getObjectIdentifier().orElseThrow(IllegalStateException::new)); + if (distinctType.getDescription().isPresent()) { jsonGenerator.writeStringField( - FIELD_NAME_TYPE_NAME, varBinaryType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, varBinaryType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0); - jsonGenerator.writeEndObject(); - } else { - jsonGenerator.writeObject(varBinaryType.asSerializableString()); + FIELD_NAME_FIELD_DESCRIPTION, distinctType.getDescription().get()); } + jsonGenerator.writeFieldName(FIELD_NAME_SOURCE_TYPE); + serializeInternal(distinctType.getSourceType(), jsonGenerator, serializeCatalogObjects); } - private void serializeSymbolType(SymbolType<?> symbolType, JsonGenerator jsonGenerator) - throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, symbolType.isNullable()); - jsonGenerator.writeStringField( - FIELD_NAME_SYMBOL_CLASS, symbolType.getDefaultConversion().getName()); - jsonGenerator.writeEndObject(); - } - - private void serializeTypeInformationRawType( - TypeInformationRawType<?> rawType, JsonGenerator jsonGenerator) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable()); - jsonGenerator.writeStringField( - FIELD_NAME_TYPE_INFO, - EncodingUtils.encodeObjectToString(rawType.getTypeInformation())); - jsonGenerator.writeEndObject(); - } - - private void serializeStructuredType(StructuredType structuredType, JsonGenerator jsonGenerator) + private static void serializeStructuredType( + StructuredType structuredType, + JsonGenerator jsonGenerator, + boolean serializeCatalogObjects) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField( - FIELD_NAME_TYPE_NAME, LogicalTypeRoot.STRUCTURED_TYPE.name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, structuredType.isNullable()); if (structuredType.getObjectIdentifier().isPresent()) { jsonGenerator.writeObjectField( - FIELD_NAME_IDENTIFIER, structuredType.getObjectIdentifier().get()); + FIELD_NAME_OBJECT_IDENTIFIER, structuredType.getObjectIdentifier().get()); } - if (structuredType.getImplementationClass().isPresent()) { + if (structuredType.getDescription().isPresent()) { jsonGenerator.writeStringField( - FIELD_NAME_IMPLEMENTATION_CLASS, - structuredType.getImplementationClass().get().getName()); + FIELD_NAME_DESCRIPTION, structuredType.getDescription().get()); + } + if (structuredType.getImplementationClass().isPresent()) { + jsonGenerator.writeObjectField( + FIELD_NAME_IMPLEMENTATION_CLASS, structuredType.getImplementationClass().get()); } jsonGenerator.writeFieldName(FIELD_NAME_ATTRIBUTES); jsonGenerator.writeStartArray(); - for (StructuredType.StructuredAttribute attribute : structuredType.getAttributes()) { + for (StructuredAttribute attribute : structuredType.getAttributes()) { jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_NAME, attribute.getName()); - jsonGenerator.writeObjectField(FIELD_NAME_LOGICAL_TYPE, attribute.getType()); + jsonGenerator.writeStringField(FIELD_NAME_ATTRIBUTE_NAME, attribute.getName()); + jsonGenerator.writeFieldName(FIELD_NAME_ATTRIBUTE_TYPE); + serializeInternal(attribute.getType(), jsonGenerator, serializeCatalogObjects); if (attribute.getDescription().isPresent()) { jsonGenerator.writeStringField( - FIELD_NAME_DESCRIPTION, attribute.getDescription().get()); + FIELD_NAME_ATTRIBUTE_DESCRIPTION, attribute.getDescription().get()); } jsonGenerator.writeEndObject(); } jsonGenerator.writeEndArray(); - jsonGenerator.writeBooleanField(FIELD_NAME_FINAL, structuredType.isFinal()); - jsonGenerator.writeBooleanField(FIELD_NAME_INSTANTIABLE, structuredType.isInstantiable()); - jsonGenerator.writeStringField( - FIELD_NAME_COMPARISON, structuredType.getComparison().name()); - if (structuredType.getSuperType().isPresent()) { - jsonGenerator.writeObjectField( - FIELD_NAME_SUPPER_TYPE, structuredType.getSuperType().get()); + if (!structuredType.isFinal()) { + jsonGenerator.writeBooleanField(FIELD_NAME_FINAL, false); } - if (structuredType.getDescription().isPresent()) { + if (!structuredType.isInstantiable()) { + jsonGenerator.writeBooleanField(FIELD_NAME_INSTANTIABLE, false); + } + if (structuredType.getComparison() != StructuredComparison.NONE) { jsonGenerator.writeStringField( - FIELD_NAME_DESCRIPTION, structuredType.getDescription().get()); + FIELD_NAME_COMPARISON, structuredType.getComparison().name()); + } + if (structuredType.getSuperType().isPresent()) { + jsonGenerator.writeObjectField( + FIELD_NAME_SUPER_TYPE, structuredType.getSuperType().get()); } - jsonGenerator.writeEndObject(); } - private void serializeDistinctType(DistinctType distinctType, JsonGenerator jsonGenerator) + private static void serializeSpecializedRaw(RawType<?> rawType, JsonGenerator jsonGenerator) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, LogicalTypeRoot.DISTINCT_TYPE.name()); - Preconditions.checkArgument(distinctType.getObjectIdentifier().isPresent()); - jsonGenerator.writeObjectField( - FIELD_NAME_IDENTIFIER, distinctType.getObjectIdentifier().get()); - jsonGenerator.writeObjectField(FIELD_NAME_SOURCE_TYPE, distinctType.getSourceType()); - if (distinctType.getDescription().isPresent()) { + jsonGenerator.writeStringField(FIELD_NAME_CLASS, rawType.getOriginatingClass().getName()); + final TypeSerializer<?> serializer = rawType.getTypeSerializer(); + if (serializer.equals(NullSerializer.INSTANCE)) { jsonGenerator.writeStringField( - FIELD_NAME_DESCRIPTION, distinctType.getDescription().get()); + FIELD_NAME_SPECIAL_SERIALIZER, FIELD_VALUE_EXTERNAL_SERIALIZER_NULL); + } else if (serializer instanceof ExternalSerializer) { + final ExternalSerializer<?, ?> externalSerializer = + (ExternalSerializer<?, ?>) rawType.getTypeSerializer(); + if (externalSerializer.isInternalInput()) { + throw new TableException( + "Asymmetric external serializers are currently not supported. " + + "The input must not be internal if the output is external."); + } + jsonGenerator.writeObjectField( + FIELD_NAME_EXTERNAL_DATA_TYPE, externalSerializer.getDataType()); + } else { + throw new TableException("Unsupported special case for RAW type."); } - jsonGenerator.writeEndObject(); } - private void serializeTimestampType(TimestampType timestampType, JsonGenerator jsonGenerator) - throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision()); - jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind()); - jsonGenerator.writeEndObject(); - } + // -------------------------------------------------------------------------------------------- + // Compact Serialization + // -------------------------------------------------------------------------------------------- - private void serializeZonedTimestampType( - ZonedTimestampType timestampType, JsonGenerator jsonGenerator) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision()); - jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind()); - jsonGenerator.writeEndObject(); + private static boolean supportsCompactSerialization( + LogicalType logicalType, boolean serializeCatalogObjects) { + return logicalType.accept(new CompactSerializationChecker(serializeCatalogObjects)); } - private void serializeLocalZonedTimestampType( - LocalZonedTimestampType timestampType, JsonGenerator jsonGenerator) throws IOException { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable()); - jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision()); - jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind()); - jsonGenerator.writeEndObject(); + private static void serializeTypeWithCompactSerialization( + LogicalType logicalType, JsonGenerator jsonGenerator) throws IOException { + final String compactString = logicalType.asSerializableString(); + jsonGenerator.writeString(compactString); } - @SuppressWarnings("rawtypes") - private void serializeRawType( - RawType<?> rawType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) - throws IOException { - TypeSerializer<?> typeSer = rawType.getTypeSerializer(); - if (typeSer instanceof ExternalSerializer) { - ExternalSerializer externalSer = (ExternalSerializer) typeSer; - // Currently, ExternalSerializer with `isInternalInput=false` will be serialized, - // Once `isInternalInput=true` needs to be serialized, we can add individual field in - // the json to support it, and the new json plan is compatible with the previous one. - if (externalSer.isInternalInput()) { - throw new TableException( - "ExternalSerializer with `isInternalInput=true` is not supported."); - } - DataType dataType = externalSer.getDataType(); - boolean isMapView = DataViewUtils.isMapViewDataType(dataType); - boolean isListView = DataViewUtils.isListViewDataType(dataType); - if (isMapView || isListView) { - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, LogicalTypeRoot.RAW.name()); - jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable()); - if (isMapView) { - jsonGenerator.writeStringField( - FIELD_NAME_DATA_VIEW_CLASS, MapView.class.getName()); - KeyValueDataType keyValueDataType = - DataViewUtils.extractKeyValueDataTypeForMapView(dataType); - serializeDataTypeForDataView( - FIELD_NAME_KEY_TYPE, - keyValueDataType.getKeyDataType(), - jsonGenerator, - serializerProvider); - serializeDataTypeForDataView( - FIELD_NAME_VALUE_TYPE, - keyValueDataType.getValueDataType(), - jsonGenerator, - serializerProvider); - } else { - jsonGenerator.writeStringField( - FIELD_NAME_DATA_VIEW_CLASS, ListView.class.getName()); - DataType elementType = - DataViewUtils.extractElementDataTypeForListView(dataType); - serializeDataTypeForDataView( - FIELD_NAME_ELEMENT_TYPE, - elementType, - jsonGenerator, - serializerProvider); - } - jsonGenerator.writeEndObject(); - return; - } + /** + * Checks whether the given type can be serialized as a compact string created from {@link + * LogicalType#asSerializableString()}. + */ + private static class CompactSerializationChecker extends LogicalTypeDefaultVisitor<Boolean> { + + private final boolean serializeCatalogObjects; + + CompactSerializationChecker(boolean serializeCatalogObjects) { + this.serializeCatalogObjects = serializeCatalogObjects; } - jsonGenerator.writeObject(rawType.asSerializableString()); - } + @Override + public Boolean visit(CharType charType) { + return charType.getLength() > 0; + } - private void serializeDataTypeForDataView( - String key, - DataType dataType, - JsonGenerator jsonGenerator, - SerializerProvider serializerProvider) - throws IOException { - jsonGenerator.writeFieldName(key); - jsonGenerator.writeStartObject(); - jsonGenerator.writeBooleanField( - FIELD_NAME_IS_INTERNAL_TYPE, DataTypeUtils.isInternal(dataType)); - jsonGenerator.writeFieldName(FIELD_NAME_TYPE_NAME); - LogicalType logicalType = LogicalTypeDataTypeConverter.toLogicalType(dataType); - serialize(logicalType, jsonGenerator, serializerProvider); - jsonGenerator.writeEndObject(); + @Override + public Boolean visit(VarCharType varCharType) { + return varCharType.getLength() > 0; + } + + @Override + public Boolean visit(BinaryType binaryType) { + return binaryType.getLength() > 0; + } + + @Override + public Boolean visit(VarBinaryType varBinaryType) { + return varBinaryType.getLength() > 0; + } + + @Override + public Boolean visit(TimestampType timestampType) { + return timestampType.getKind() == TimestampKind.REGULAR; + } + + @Override + public Boolean visit(ZonedTimestampType zonedTimestampType) { + return zonedTimestampType.getKind() == TimestampKind.REGULAR; + } + + @Override + public Boolean visit(LocalZonedTimestampType localZonedTimestampType) { + return localZonedTimestampType.getKind() == TimestampKind.REGULAR; + } + + @Override + public Boolean visit(DistinctType distinctType) { + // catalog-based distinct types are always string serializable, + // however, depending on the configuration, we serialize the entire type + return !serializeCatalogObjects; + } + + @Override + public Boolean visit(StructuredType structuredType) { + // catalog-based structured types are always string serializable, + // however, depending on the configuration, we serialize the entire type + return structuredType.getObjectIdentifier().isPresent() && !serializeCatalogObjects; Review comment: Not sure if I understand this question correctly, But this is exactly what we do. The children will have a compact representation. However, there is no string representation defined in the SQL standard like `ROW`. We would need to make up syntax that should actually be defined by a type DDL. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.serde; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.planner.calcite.FlinkContextImpl; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.utils.CatalogManagerMocks; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link DataType} serialization and deserialization. */ +@RunWith(Parameterized.class) +public class DataTypeJsonSerdeTest { + + @Parameter public DataType dataType; + + @Test + public void testDataTypeSerde() throws IOException { + final ObjectMapper mapper = configuredObjectMapper(); + final String json = toJson(mapper, dataType); + final DataType actual = toDataType(mapper, json); + + if (json.contains("children")) { + System.out.println(); + } Review comment: Sorry, my bad. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java ########## @@ -249,28 +331,44 @@ public void testLogicalTypeSerde() throws IOException { ObjectIdentifier.of("cat", "db", "distinctType"), new VarCharType(false, 5)) .build(), + // custom RawType + new RawType<>(Integer.class, IntSerializer.INSTANCE), Review comment: It doesn't really make a difference. But I replaced it with `LocalDateTime`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org