Hi Tejas,

Yes, you can write a typefactory for enum. But I am assuming Flink should be 
able to recognize enum by default…

Anyways, you can do something like this:

Types.ENUM(RuleType.class);

This will return you a TypeInfomation which can be used to construct a 
typefactory..

BTW, could you take a look at my question in email: “How to define 
TypeInformation for Flink recursive resolved POJO” 😊 ?

Thanks,
Fuyao




From: Tejas B <tejasub1...@gmail.com>
Date: Thursday, May 12, 2022 at 16:32
To: Weihua Hu <huweihua....@gmail.com>
Cc: user <user@flink.apache.org>
Subject: [External] : Re: How to get flink to use POJO serializer when enum is 
present in POJO class
Hi Weihua,
This is the error I am getting :
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for 
CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from any 
of the 1 provided restore options. at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
 ... 11 more Caused by: 
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to 
restore operator state backend at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
 at 
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to find 
class: 11 at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
 at 
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
 at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)

From the error it looks like it's falling back to Kryo serializer instead of 
POJO serializer.

Thanks,
Tejas


On Thu, May 12, 2022 at 7:33 AM Weihua Hu 
<huweihua....@gmail.com<mailto:huweihua....@gmail.com>> wrote:
Hi, Tejas

These code is works in my idea environment.
Could you provide more error info or log?


Best,
Weihua


2022年5月10日 下午1:22,Tejas B <tejasub1...@gmail.com<mailto:tejasub1...@gmail.com>> 
写道:

Hi,
I am trying to get flink schema evolution to work for me using POJO serializer. 
But I found out that if an enum is present in the POJO then the POJO serializer 
is not used. Example of my POJO is as follows :

public class Rule {

String id;

int val;

RuleType ruleType;

//Newly added field

//int val2 = 0;



public Rule() {}



public Rule(String id, int val, RuleType ruleType) {

    this.id = id;

    this.val = val;

    this.ruleType = ruleType;

    //this.val2 = val2;

}



public String getId() {

    return id;

}



public void setId(String id) {

    this.id = id;

}



public int getVal() {

    return val;

}



public void setVal(int val) {

    this.val = val;

}



public RuleType getRuleType() {

    return ruleType;

}



public void setRuleType(RuleType ruleType) {

    this.ruleType = ruleType;

}



//public int getVal2() {

//    return val2;

//}



//public void setVal2(int val2) {

//    this.val2 = val2;

//}



@Override

public boolean equals(Object o) {

    if (this == o) return true;

    if (o == null || getClass() != o.getClass()) return false;

    Rule rule = (Rule) o;

    return val == rule.val && 
id.equals(rule.id<https://urldefense.com/v3/__http:/rule.id/__;!!ACWV5N9M2RV99hQ!In6iqbiXVGV5EOjV4INk4Kin1OlWIZ3n5wT67wwBK6rX_fZONISPLHWNnUGJKNHJYQhO8r_3JDK5HJdbT6Tl$>)
 && ruleType == rule.ruleType;

}



@Override

public int hashCode() {

    return Objects.hash(id, val, ruleType);

}



@Override

public String toString() {

    return "Rule{" +

            "name='" + id + '\'' +

            ", val=" + val +

            ", ruleType=" + ruleType +

            '}';

}

}

RuleType is an enum class as follows :

public enum RuleType {
    X,
    Y,
    Z

}

Now for the Rule class the schema evolution (Adding a new field called val2), 
works only if I write a custom typeFactory for this class.

Is there a way that I can write typeFactory for the enum class ? Why does the 
flink not recognize enum in a POJO class ?

Reply via email to