Antoine Michaud created FLINK-26470:
---------------------------------------
Summary: [Java][TypeExtractor] Missing type information in POJO
types of some types (List, Map, UUID)
Key: FLINK-26470
URL: https://issues.apache.org/jira/browse/FLINK-26470
Project: Flink
Issue Type: Bug
Components: API / Core
Affects Versions: 1.13.2
Reporter: Antoine Michaud
h2. Problem:
h4. Basic collections (List, Map) and custom types are not compatible with
flink pojo serialization.
h2. Explanation:
Like docs said, we should not use kryo in production since it's not performant
at all.
To stop using kryo, and use the native pojos serialization, we do this:
{code:java}
env.getConfig().disableGenericTypes(){code}
But pojos have to meet [some
requirements|https://nightlies.apache.org/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types].
Regarding the following code coming from flink-core v1.13.2 (and looks the same
in v1.14.4):
{code:java}
private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
Class<OUT> clazz,
List<Type> typeHierarchy,
ParameterizedType parameterizedType,
TypeInformation<IN1> in1Type,
TypeInformation<IN2> in2Type) {
checkNotNull(clazz);
// check if type information can be produced using a factory
final TypeInformation<OUT> typeFromFactory =
createTypeInfoFromFactory(clazz, typeHierarchy, in1Type, in2Type);
if (typeFromFactory != null) {
return typeFromFactory;
}
// Object is handled as generic type info
if (clazz.equals(Object.class)) {
return new GenericTypeInfo<>(clazz);
}
// Class is handled as generic type info
if (clazz.equals(Class.class)) {
return new GenericTypeInfo<>(clazz);
}
// recursive types are handled as generic type info
if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
return new GenericTypeInfo<>(clazz);
}
// check for arrays
if (clazz.isArray()) {
// primitive arrays: int[], byte[], ...
PrimitiveArrayTypeInfo<OUT> primitiveArrayInfo =
PrimitiveArrayTypeInfo.getInfoFor(clazz);
if (primitiveArrayInfo != null) {
return primitiveArrayInfo;
}
// basic type arrays: String[], Integer[], Double[]
BasicArrayTypeInfo<OUT, ?> basicArrayInfo =
BasicArrayTypeInfo.getInfoFor(clazz);
if (basicArrayInfo != null) {
return basicArrayInfo;
}
// object arrays
else {
TypeInformation<?> componentTypeInfo =
createTypeInfoWithTypeHierarchy(
typeHierarchy, clazz.getComponentType(), in1Type,
in2Type);
return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
}
}
// check for writable types
if (isHadoopWritable(clazz)) {
return createHadoopWritableTypeInfo(clazz);
}
// check for basic types
TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
if (basicTypeInfo != null) {
return basicTypeInfo;
}
// check for SQL time types
TypeInformation<OUT> timeTypeInfo = SqlTimeTypeInfo.getInfoFor(clazz);
if (timeTypeInfo != null) {
return timeTypeInfo;
}
// check for subclasses of Value
if (Value.class.isAssignableFrom(clazz)) {
Class<? extends Value> valueClass = clazz.asSubclass(Value.class);
return (TypeInformation<OUT>)
ValueTypeInfo.getValueTypeInfo(valueClass);
}
// check for subclasses of Tuple
if (Tuple.class.isAssignableFrom(clazz)) {
if (clazz == Tuple0.class) {
return new TupleTypeInfo(Tuple0.class);
}
throw new InvalidTypesException(
"Type information extraction for tuples (except Tuple0) cannot
be done based on the class.");
}
// check for Enums
if (Enum.class.isAssignableFrom(clazz)) {
return new EnumTypeInfo(clazz);
}
// special case for POJOs generated by Avro.
if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
}
if (Modifier.isInterface(clazz.getModifiers())) {
// Interface has no members and is therefore not handled as POJO
return new GenericTypeInfo<>(clazz);
}
try {
Type t = parameterizedType != null ? parameterizedType : clazz;
TypeInformation<OUT> pojoType =
analyzePojo(t, new ArrayList<>(typeHierarchy), in1Type,
in2Type);
if (pojoType != null) {
return pojoType;
}
} catch (InvalidTypesException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Unable to handle type " + clazz + " as POJO. Message: " +
e.getMessage(),
e);
}
// ignore and create generic type info
}
// return a generic type
return new GenericTypeInfo<>(clazz);
} {code}
Only following types are compatible (e.g. not treated as GenericType):
* All custom pojos with annotation @TypeInfo
* arrays
* All hadoop writable types
* basic types (string, bigint/bigdecimal, instant/date, boxed primitives)
* primitive types
* sql time types
* All implementing org.apache.flink.types.Value
* All implementing org.apache.flink.api.java.tuple.Tuple
* enums
* Avro types
* nested pojo
But not:
* List, Map, since they are falling into
`Modifier.isInterface(clazz.getModifiers())`
* UUID, since it is treated as generic pojo (no getter/setter on all fields)
{quote}By the way, we can't register our custom serializer, that can really be
the perfect world (@TypeInfo documentation says that there is
TypeExtractor#registerFactory(Type, Class).. But there isn't)
{quote}
h3. How to fix it ?
There is already existing ListTypeInfo and MapTypeInfo, that can be simply used
by the method TypeExtractor.privateGetForClass(...).
For UUID, we can create a customisable TypeInformationFactory, that can
contains all specific stuff that is not fitting the native flink libs. The
other way is to add it as a BasicType.
_+I can help to contribute !+_
Thanks !
--
This message was sent by Atlassian Jira
(v8.20.1#820001)