I have a DataStream on which I am applying a CEP pattern and grouping the
results using keyby(). The DataStream Object is a pojo :

public class DataStreamObject {
    private String field1;
    private String field2;

    public DataStreamObject(String field1, String field2) {
        this.field1 = field1;
        this.field2 = field2;
    }

    public void setField1(String field1) {
        this.field1 = field1;
    }

    public String getField1() {
        return field1;
    }


    public void setField2(String field2) {
        this.field2 = field2;
    }

    public String getField2() {
        return field2;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof DataStreamObject)) return false;

        DataStreamObject that = (DataStreamObject) o;

        if (!getField1().equals(that.getField1())) return false;
        return getField2().equals(that.getField2());
    }

    @Override
    public int hashCode() {
        int result = getField1().hashCode();
        result = 31 * result + getField2().hashCode();
        return result;
    }

    @Override
    public String toString() {
        return "DriverSameAsCustomer{" +
                "field1='" + field1 + '\'' +
                ", field2='" + field2 + '\'' +
                '}';
    }
}

When I submit my flinkjob, I get the following error :


This type (GenericType<com.foo.DataStreamObject>) cannot be used as key.
        
org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
        
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294)
        com.foo.Main.main(Main.java:66)


As I understand, I do not need to implement Key interface if the class
is a POJO (which it is).

Please help me understand where I am going wrong an suggest a fix.

Reply via email to