Re: How to get flink to use POJO serializer when enum is present in POJO class
Hi Weihua, This is the error I am getting : Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ... 11 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 11 at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >From the error it looks like it's falling back to Kryo serializer instead of POJO serializer. Thanks, Tejas On Thu, May 12, 2022 at 7:33 AM Weihua Hu wrote: > Hi, Tejas > > These code is works in my idea environment. > Could you provide more error info or log? > > > Best, > Weihua > > 2022年5月10日 下午1:22,Tejas B 写道: > > Hi, > I am trying to get flink schema evolution to work for me using POJO > serializer. But I found out that if an enum is present in the POJO then the > POJO serializer is not used. Example of my POJO is as follows : > > public class Rule { > String id;int val; > RuleType ruleType;//Newly added field//int val2 = 0; > public Rule() {} > public Rule(String id, int val, RuleType ruleType) { > this.id = id; > this.val = val; > this.ruleType = ruleType; > //this.val2 = val2; > } > public String getId() { > return id; > } > public void setId(String id) { > this.id = id; > } > public int getVal() { > return val; > } > public void setVal(int val) { > this.val = val; > } > public RuleType getRuleType() { > return ruleType; > } > public void setRuleType(RuleType ruleType) { > this.ruleType = ruleType; > } > //public int getVal2() {//return val2;//} > //public void setVal2(int val2) {//this.val2 = val2;//} > @Overridepublic boolean equals(Object o) { > if (this == o) return true; > if (o == null || getClass() != o.getClass()) return false; > Rule rule = (Rule) o; > return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType; > } > @Overridepublic int hashCode() { > return Objects.hash(id, val, ruleType); > } > @Overridepublic String toString() { > return "Rule{" + > "name='" + id + '\'' + > ", val=" + val + > ", ruleType=" + ruleType + > '}'; > } > > } > > RuleType is an enum class as follows : > > public enum RuleType { > X, > Y, > Z > > } > > Now for the Rule class the schema evolution (Adding a new field called > val2), works only if I write a custom typeFactory for this class. > > Is there a way that I can write typeFactory for the enum class ? Why does > the flink not recognize enum in a POJO class ? > > >
Re: How to get flink to use POJO serializer when enum is present in POJO class
Hi Arvid, Thanks for replying. But I have all the getters and setters in the example. As you can see, the val2 field is commented and hence its getter and setter are commented out. When restoring from a savepoint, I uncomment these and get errors. If I remove reference to the enum RuleType from the Rule class, then everything works fine. So it's definitely a enum issue. What I am looking for is, why doesn't flink recognize enum for POJO serializer ? Thanks, Tejas On Tue, May 10, 2022 at 2:06 AM Arvid Heise wrote: > Is this your whole Rule class? If so, then the issue is not the type of > the field but that you haven't added a setter and getter for it. > > On Tue, May 10, 2022 at 7:23 AM Tejas B wrote: > >> Hi, >> I am trying to get flink schema evolution to work for me using POJO >> serializer. But I found out that if an enum is present in the POJO then the >> POJO serializer is not used. Example of my POJO is as follows : >> >> public class Rule { >> String id;int val; >> RuleType ruleType;//Newly added field//int val2 = 0; >> public Rule() {} >> public Rule(String id, int val, RuleType ruleType) { >> this.id = id; >> this.val = val; >> this.ruleType = ruleType; >> //this.val2 = val2; >> } >> public String getId() { >> return id; >> } >> public void setId(String id) { >> this.id = id; >> } >> public int getVal() { >> return val; >> } >> public void setVal(int val) { >> this.val = val; >> } >> public RuleType getRuleType() { >> return ruleType; >> } >> public void setRuleType(RuleType ruleType) { >> this.ruleType = ruleType; >> } >> //public int getVal2() {//return val2;//} >> //public void setVal2(int val2) {//this.val2 = val2;//} >> @Overridepublic boolean equals(Object o) { >> if (this == o) return true; >> if (o == null || getClass() != o.getClass()) return false; >> Rule rule = (Rule) o; >> return val == rule.val && id.equals(rule.id) && ruleType == >> rule.ruleType; >> } >> @Overridepublic int hashCode() { >> return Objects.hash(id, val, ruleType); >> } >> @Overridepublic String toString() { >> return "Rule{" + >> "name='" + id + '\'' + >> ", val=" + val + >> ", ruleType=" + ruleType + >> '}'; >> } >> >> } >> >> RuleType is an enum class as follows : >> >> public enum RuleType { >> X, >> Y, >> Z >> >> } >> >> Now for the Rule class the schema evolution (Adding a new field called >> val2), works only if I write a custom typeFactory for this class. >> >> Is there a way that I can write typeFactory for the enum class ? Why does >> the flink not recognize enum in a POJO class ? >> >
How to get flink to use POJO serializer when enum is present in POJO class
Hi, I am trying to get flink schema evolution to work for me using POJO serializer. But I found out that if an enum is present in the POJO then the POJO serializer is not used. Example of my POJO is as follows : public class Rule { String id;int val; RuleType ruleType;//Newly added field//int val2 = 0; public Rule() {} public Rule(String id, int val, RuleType ruleType) { this.id = id; this.val = val; this.ruleType = ruleType; //this.val2 = val2; } public String getId() { return id; } public void setId(String id) { this.id = id; } public int getVal() { return val; } public void setVal(int val) { this.val = val; } public RuleType getRuleType() { return ruleType; } public void setRuleType(RuleType ruleType) { this.ruleType = ruleType; } //public int getVal2() {//return val2;//} //public void setVal2(int val2) {//this.val2 = val2;//} @Overridepublic boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Rule rule = (Rule) o; return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType; } @Overridepublic int hashCode() { return Objects.hash(id, val, ruleType); } @Overridepublic String toString() { return "Rule{" + "name='" + id + '\'' + ", val=" + val + ", ruleType=" + ruleType + '}'; } } RuleType is an enum class as follows : public enum RuleType { X, Y, Z } Now for the Rule class the schema evolution (Adding a new field called val2), works only if I write a custom typeFactory for this class. Is there a way that I can write typeFactory for the enum class ? Why does the flink not recognize enum in a POJO class ?
Looking for suggestions about multithreaded CEP to be used with flink
Hi, Here's our use case : We are planning to build a rule based engine on top of flink with huge number of rules(1000s). the rules could be stateless or stateful. Example stateless rule is : A.id = 3 && A.name = 'abc' || A.color = red. Example stateful rule is : A is event.id =3, B is event.name = 'abc', C is event.color = red and we are looking for pattern AB*C over time window of 1 hour. Now we have tried to use flink CEP for this purpose and program crashed because of lot of threads. The explanation is : every application of CEP.pattern creates a new operator in the graph and flink can't support that many vertices in a graph. Other approach could be to use processFunction in flink, but still to run the rules on events stream you'd have to use some sort of CEP or write your own. My question is, does anybody have any other suggestions on how to achieve this ? Any other CEPs that integrate and work better with flink (siddhi, jasper, drools) ? Any experience would be helpful.