Hi,
Here's a minimal example using an ArrayList, a HashSet, and a TreeSet:
```
package com.example;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.TreeSet;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class App {
public static class Pojo {
public ArrayList<Integer> list;
public HashSet<Integer> set;
public TreeSet<Integer> treeset;
public Pojo() {
this.list = new ArrayList<>();
this.set = new HashSet<>();
this.treeset = new TreeSet<>();
}
}
public static void main(String[] args) throws Exception {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableGenericTypes();
env.fromElements(new Pojo()).print();
env.execute("Pipeline");
}
}
```
The result of running:
```
13:08:20,074 INFO org.apache.flink.api.java.typeutils.TypeExtractor
[] - class java.util.ArrayList does not contain a setter for field size
13:08:20,077 INFO org.apache.flink.api.java.typeutils.TypeExtractor
[] - Class class java.util.ArrayList cannot be used as a POJO type because not
all fields are valid POJO fields, and must be processed as GenericType. Please
read the Flink documentation on "Data Types & Serialization" for details of the
effect on performance and schema evolution.
13:08:20,078 INFO org.apache.flink.api.java.typeutils.TypeExtractor
[] - Field Pojo#list will be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance and schema evolution.
13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor
[] - No fields were detected for class java.util.HashSet so it cannot be used
as a POJO type and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance and schema evolution.
13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor
[] - Field Pojo#set will be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance and schema evolution.
13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor
[] - No fields were detected for class java.util.TreeSet so it cannot be used
as a POJO type and must be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance and schema evolution.
13:08:20,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor
[] - Field Pojo#sset will be processed as GenericType. Please read the Flink
documentation on "Data Types & Serialization" for details of the effect on
performance and schema evolution.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner
(file:/Users/sammar/.m2/repository/org/apache/flink/flink-core/1.17.1/flink-core-1.17.1.jar)
to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations
WARNING: All illegal access operations will be denied in a future release
Exception in thread "main" java.lang.UnsupportedOperationException: Generic
types have been disabled in the ExecutionConfig and type java.util.ArrayList is
treated as a generic type.
at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
at
org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:350)
at
org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:342)
at
org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:1037)
at
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:419)
at
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:391)
at
org.apache.flink.streaming.api.graph.StreamGraph.addLegacySource(StreamGraph.java:345)
at
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateInternal(LegacySourceTransformationTranslator.java:66)
at
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:53)
at
org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator.translateForStreamingInternal(LegacySourceTransformationTranslator.java:40)
at
org.apache.flink.streaming.api.graph.SimpleTransformationTranslator.translateForStreaming(SimpleTransformationTranslator.java:62)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:849)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.getParentInputIds(StreamGraphGenerator.java:870)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.translate(StreamGraphGenerator.java:828)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:579)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:319)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2248)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2239)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2225)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2052)
at com.example.App.main(App.java:26)
```
Best regards,
Saleh.
> On 14 Aug 2023, at 12:48 PM, Alexey Novakov via user <[email protected]>
> wrote:
>
> Hi Saleh,
>
> If you could show us the minimal code example of the issue (event classes), I
> think someone could help you to solve it.
>
> Best regards,
> Alexey
>
> On Mon, Aug 14, 2023 at 9:23 AM <[email protected] <mailto:[email protected]>>
> wrote:
> Hi,
>
> According to this blog post
> https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer
>
> <https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/#pojoserializer>
> The "Must be processed as GenericType" message means that the POJO serializer
> will not be used and instead, Kyro will be used.
>
> I created a simple POJO to test it again with a java Collection but I got the
> same message. Disabling generic types throws an exception.
>
> I'm not sure how to use these types along with the POJO serializer or any
> other fast serializer.
>
> Best regards,
> Saleh.
>
>
>
>> On 14 Aug 2023, at 4:59 AM, liu ron <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi,
>>
>> According to the test in [1], I think Flink can recognize Pojo class which
>> contains java List, so I think you can refer to the related Pojo class
>> implementation.
>>
>> [1]
>> https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192
>>
>> <https://github.com/apache/flink/blob/fc2b5d8f53a41695117f6eaf4c798cc183cf1e36/flink-core/src/test/java/org/apache/flink/api/java/typeutils/PojoTypeExtractionTest.java#L192>
>>
>> Best,
>> Ron
>>
>> <[email protected] <mailto:[email protected]>> 于2023年8月13日周日 22:50写道:
>> Greetings,
>>
>> I am working on a project that needs to process around 100k events per
>> second and I'm trying to improve performance.
>>
>> Most of the classes being used are POJOs but have a couple of fields using a
>> `java.util` class, either `ArrayList`, `HashSet` or `SortedSet` etc. This
>> forces Flink to use Kyro and throw these warnings:
>>
>> ```
>> class java.util.ArrayList does not contain a setter for field size
>> Class class java.util.ArrayList cannot be used as a POJO type because not
>> all fields are valid POJO fields, and must be processed as GenericType.
>> Please read the Flink documentation on "Data Types & Serialization" for
>> details of the effect on performance and schema evolution.
>> ```
>>
>> ```
>> No fields were detected for class java.util.HashSet so it cannot be used as
>> a POJO type and must be processed as GenericType. Please read the Flink
>> documentation on "Data Types & Serialization" for details of the effect on
>> performance and schema evolution.
>> I read through the documentation and stackoverflow and the conclusion is
>> that I need to make a TypeInfoFactory and use it inside a TypeInfo
>> annotation over my POJO.
>> ```
>>
>> My question is what do I need to do to get Flink to recognize my classes as
>> POJOs and use the POJO serializer for better performance?
>> I read through the documentation and stackoverflow and the conclusion is
>> that I need to make a TypeInfoFactory and use it inside a TypeInfo
>> annotation over my POJO.
>> While this seems incredibly tedious and I keep thinking "there must be a
>> better way", I would be fine with this solution if I could figure out how to
>> do this for the Set types I'm using.
>>
>> Any help would be appreciated.
>