Hi Tejas, Yes, you can write a typefactory for enum. But I am assuming Flink should be able to recognize enum by default…
Anyways, you can do something like this: Types.ENUM(RuleType.class); This will return you a TypeInfomation which can be used to construct a typefactory.. BTW, could you take a look at my question in email: “How to define TypeInformation for Flink recursive resolved POJO” 😊 ? Thanks, Fuyao From: Tejas B <tejasub1...@gmail.com> Date: Thursday, May 12, 2022 at 16:32 To: Weihua Hu <huweihua....@gmail.com> Cc: user <user@flink.apache.org> Subject: [External] : Re: How to get flink to use POJO serializer when enum is present in POJO class Hi Weihua, This is the error I am getting : Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ... 11 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 11 at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) From the error it looks like it's falling back to Kryo serializer instead of POJO serializer. Thanks, Tejas On Thu, May 12, 2022 at 7:33 AM Weihua Hu <huweihua....@gmail.com<mailto:huweihua....@gmail.com>> wrote: Hi, Tejas These code is works in my idea environment. Could you provide more error info or log? Best, Weihua 2022年5月10日 下午1:22,Tejas B <tejasub1...@gmail.com<mailto:tejasub1...@gmail.com>> 写道: Hi, I am trying to get flink schema evolution to work for me using POJO serializer. But I found out that if an enum is present in the POJO then the POJO serializer is not used. Example of my POJO is as follows : public class Rule { String id; int val; RuleType ruleType; //Newly added field //int val2 = 0; public Rule() {} public Rule(String id, int val, RuleType ruleType) { this.id = id; this.val = val; this.ruleType = ruleType; //this.val2 = val2; } public String getId() { return id; } public void setId(String id) { this.id = id; } public int getVal() { return val; } public void setVal(int val) { this.val = val; } public RuleType getRuleType() { return ruleType; } public void setRuleType(RuleType ruleType) { this.ruleType = ruleType; } //public int getVal2() { // return val2; //} //public void setVal2(int val2) { // this.val2 = val2; //} @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Rule rule = (Rule) o; return val == rule.val && id.equals(rule.id<https://urldefense.com/v3/__http:/rule.id/__;!!ACWV5N9M2RV99hQ!In6iqbiXVGV5EOjV4INk4Kin1OlWIZ3n5wT67wwBK6rX_fZONISPLHWNnUGJKNHJYQhO8r_3JDK5HJdbT6Tl$>) && ruleType == rule.ruleType; } @Override public int hashCode() { return Objects.hash(id, val, ruleType); } @Override public String toString() { return "Rule{" + "name='" + id + '\'' + ", val=" + val + ", ruleType=" + ruleType + '}'; } } RuleType is an enum class as follows : public enum RuleType { X, Y, Z } Now for the Rule class the schema evolution (Adding a new field called val2), works only if I write a custom typeFactory for this class. Is there a way that I can write typeFactory for the enum class ? Why does the flink not recognize enum in a POJO class ?