Re: How to get flink to use POJO serializer when enum is present in POJO class

2022-05-12 Thread Tejas B
Hi Weihua,
This is the error I am getting :
Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for
CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from
any of the 1 provided restore options. at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
... 11 more Caused by:
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying
to restore operator state backend at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
at
org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to
find class: 11 at
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)

>From the error it looks like it's falling back to Kryo serializer instead
of POJO serializer.

Thanks,
Tejas


On Thu, May 12, 2022 at 7:33 AM Weihua Hu  wrote:

> Hi, Tejas
>
> These code is works in my idea environment.
> Could you provide more error info or log?
>
>
> Best,
> Weihua
>
> 2022年5月10日 下午1:22,Tejas B  写道:
>
> Hi,
> I am trying to get flink schema evolution to work for me using POJO
> serializer. But I found out that if an enum is present in the POJO then the
> POJO serializer is not used. Example of my POJO is as follows :
>
> public class Rule {
> String id;int val;
> RuleType ruleType;//Newly added field//int val2 = 0;
> public Rule() {}
> public Rule(String id, int val, RuleType ruleType) {
> this.id = id;
> this.val = val;
> this.ruleType = ruleType;
> //this.val2 = val2;
> }
> public String getId() {
> return id;
> }
> public void setId(String id) {
> this.id = id;
> }
> public int getVal() {
> return val;
> }
> public void setVal(int val) {
> this.val = val;
> }
> public RuleType getRuleType() {
> return ruleType;
> }
> public void setRuleType(RuleType ruleType) {
> this.ruleType = ruleType;
> }
> //public int getVal2() {//return val2;//}
> //public void setVal2(int val2) {//this.val2 = val2;//}
> @Overridepublic boolean equals(Object o) {
> if (this == o) return true;
> if (o == null || getClass() != o.getClass()) return false;
> Rule rule = (Rule) o;
> return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType;
> }
> @Overridepublic int hashCode() {
> return Objects.hash(id, val, ruleType);
> }
> @Overridepublic String toString() {
> return "Rule{" +
> "name='" + id + '\'' +
> ", val=" + val +
> ", ruleType=" + ruleType +
> '}';
> }
>
> }
>
> RuleType is an enum class as follows :
>
> public enum RuleType {
> X,
> Y,
> Z
>
> }
>
> Now for the Rule class the schema evolution (Adding a new field called
> val2), works only if I write a custom typeFactory for this class.
>
> Is there a way that I can write typeFactory for the enum class ? Why does
> the flink not recognize enum in a POJO class ?
>
>
>


Re: How to get flink to use POJO serializer when enum is present in POJO class

2022-05-11 Thread Tejas B
Hi Arvid,
Thanks for replying. But I have all the getters and setters in the example.
As you can see, the val2 field is commented and hence its getter and setter
are commented out. When restoring from a savepoint, I uncomment these and
get errors.
If I remove reference to the enum RuleType from the Rule class, then
everything works fine. So it's definitely a enum issue.
What I am looking for is, why doesn't flink recognize enum for POJO
serializer ?

Thanks,
Tejas


On Tue, May 10, 2022 at 2:06 AM Arvid Heise  wrote:

> Is this your whole Rule class? If so, then the issue is not the type of
> the field but that you haven't added a setter and getter for it.
>
> On Tue, May 10, 2022 at 7:23 AM Tejas B  wrote:
>
>> Hi,
>> I am trying to get flink schema evolution to work for me using POJO
>> serializer. But I found out that if an enum is present in the POJO then the
>> POJO serializer is not used. Example of my POJO is as follows :
>>
>> public class Rule {
>> String id;int val;
>> RuleType ruleType;//Newly added field//int val2 = 0;
>> public Rule() {}
>> public Rule(String id, int val, RuleType ruleType) {
>> this.id = id;
>> this.val = val;
>> this.ruleType = ruleType;
>> //this.val2 = val2;
>> }
>> public String getId() {
>> return id;
>> }
>> public void setId(String id) {
>> this.id = id;
>> }
>> public int getVal() {
>> return val;
>> }
>> public void setVal(int val) {
>> this.val = val;
>> }
>> public RuleType getRuleType() {
>> return ruleType;
>> }
>> public void setRuleType(RuleType ruleType) {
>> this.ruleType = ruleType;
>> }
>> //public int getVal2() {//return val2;//}
>> //public void setVal2(int val2) {//this.val2 = val2;//}
>> @Overridepublic boolean equals(Object o) {
>> if (this == o) return true;
>> if (o == null || getClass() != o.getClass()) return false;
>> Rule rule = (Rule) o;
>> return val == rule.val && id.equals(rule.id) && ruleType == 
>> rule.ruleType;
>> }
>> @Overridepublic int hashCode() {
>> return Objects.hash(id, val, ruleType);
>> }
>> @Overridepublic String toString() {
>> return "Rule{" +
>> "name='" + id + '\'' +
>> ", val=" + val +
>> ", ruleType=" + ruleType +
>> '}';
>> }
>>
>> }
>>
>> RuleType is an enum class as follows :
>>
>> public enum RuleType {
>> X,
>> Y,
>> Z
>>
>> }
>>
>> Now for the Rule class the schema evolution (Adding a new field called
>> val2), works only if I write a custom typeFactory for this class.
>>
>> Is there a way that I can write typeFactory for the enum class ? Why does
>> the flink not recognize enum in a POJO class ?
>>
>


How to get flink to use POJO serializer when enum is present in POJO class

2022-05-09 Thread Tejas B
Hi,
I am trying to get flink schema evolution to work for me using POJO
serializer. But I found out that if an enum is present in the POJO then the
POJO serializer is not used. Example of my POJO is as follows :

public class Rule {
String id;int val;
RuleType ruleType;//Newly added field//int val2 = 0;
public Rule() {}
public Rule(String id, int val, RuleType ruleType) {
this.id = id;
this.val = val;
this.ruleType = ruleType;
//this.val2 = val2;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getVal() {
return val;
}
public void setVal(int val) {
this.val = val;
}
public RuleType getRuleType() {
return ruleType;
}
public void setRuleType(RuleType ruleType) {
this.ruleType = ruleType;
}
//public int getVal2() {//return val2;//}
//public void setVal2(int val2) {//this.val2 = val2;//}
@Overridepublic boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Rule rule = (Rule) o;
return val == rule.val && id.equals(rule.id) && ruleType == rule.ruleType;
}
@Overridepublic int hashCode() {
return Objects.hash(id, val, ruleType);
}
@Overridepublic String toString() {
return "Rule{" +
"name='" + id + '\'' +
", val=" + val +
", ruleType=" + ruleType +
'}';
}

}

RuleType is an enum class as follows :

public enum RuleType {
X,
Y,
Z

}

Now for the Rule class the schema evolution (Adding a new field called
val2), works only if I write a custom typeFactory for this class.

Is there a way that I can write typeFactory for the enum class ? Why does
the flink not recognize enum in a POJO class ?


Looking for suggestions about multithreaded CEP to be used with flink

2021-08-19 Thread Tejas B
Hi,
Here's our use case :
We are planning to build a rule based engine on top of flink with huge number 
of rules(1000s). the rules could be stateless or stateful. 
Example stateless rule is : A.id = 3 && A.name = 'abc' || A.color = red. 
Example stateful rule is : A is event.id =3, B is event.name = 'abc', C is 
event.color = red and we are looking for pattern AB*C over time window of 1 
hour.

Now we have tried to use flink CEP for this purpose and program crashed because 
of lot of threads. The explanation is : every application of CEP.pattern 
creates a new operator in the graph and flink can't support that many vertices 
in a graph.

Other approach could be to use processFunction in flink, but still to run the 
rules on events stream you'd have to use some sort of CEP or write your own.

My question is, does anybody have any other suggestions on how to achieve this 
? Any other CEPs that integrate and work better with flink (siddhi, jasper, 
drools) ? Any experience would be helpful.