Hi Naehee,

the serializer for case classes is generated using the Scala macro that is also responsible for extracting the TypeInformation implcitly from your DataStream API program.

It should be possible to use POJO serializer with case classes. But wouldn't it be easier to just use regular classes (with the `case` keyword)?

When you do `TypeInformation.of(classOf[MyClass])` it is ensured that the Java type extraction stack is called. You can then verify what kind of TypeInformation comes out of there.

A case class like:

case class MyClass(var field2:String, var field2:String) {
  this() = this(null, null)
}

should work as a POJO.

You can then pass this instanced to operators like:

in.map(func)(TypeInformation.of(classOf[MyClass]))

Let me know if it helped.

Btw the best schema evolution might be achieved with Avro instead.

Regards,
Timo

On 13.07.21 00:09, Naehee Kim wrote:
Hi Dawid,

Thanks for your reply. Good to know it is due to historic and compatibility reasons.

The reason why I started looking into POJO rules is to understand if Scala Case Class can conform to POJO rules to support schema evolution. In our case, we store several Scala Case Classes to RocksDB state backend and those classes can evolve over time, mostly simply new fields being added. At each time of change, we should start with a fresh state because the flink job cannot restart from the previous savepoint. It looks like it uses Kryo and Kryo doesn't support schema evolution. We'd like to support schema evolution so that the job can start with a savepoint.

I have a few questions regarding this. In v1.11 or upper version,
(1) What is the best way to support schema evolution for Scala Case Class if not following POJO rules? Should I develop a Serializer Snapshot inheriting TypeSerializer for Case Class? (2) What is the purpose of  ScalaCaseClassSerializer and CaseClassSerializer? Why two serializers for Case Class? (3) Why Case Class doesn't have a matching Serializer Snapshot? What serializer Scala Case Class falls into? Is it Kryo?

I'd appreciate it if you answer my questions.

Regards,
Naehee

On Thu, Jul 8, 2021 at 3:25 AM Dawid Wysakowicz <dwysakow...@apache.org <mailto:dwysakow...@apache.org>> wrote:

    Hi Naehee,

    Short answer would be for historic reasons and compatibility
    reasons. It was implemented that way back in the days and we don't
    want to change the default type extraction logic. Otherwise user
    jobs that rely on the default type extraction logic for state
    storing would end up with a state stored in an incompatible way with
    the updated serializer.

    This is not a problem for Table/SQL programs as we control the state
    internally, and that's why we were able to change the requirements
    for POJOs in Table/SQL programs. [1]

    Best,

    Dawid

    [1]
    
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/types/#user-defined-data-types
    
<https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/types/#user-defined-data-types>

    On 08/07/2021 00:09, Naehee Kim wrote:
    According to the Flink doc,

    Flink recognizes a data type as a POJO type (and allows “by-name”
    field referencing) if the following conditions are fulfilled:

      * The class is public and standalone (no non-static inner class)
      * The class has a public no-argument constructor
      * All non-static, non-transient fields in the class (and all
        superclasses) are either public (and non-final) or have a
        public getter- and a setter- method that follows the Java
        beans naming conventions for getters and setters.


    PojoSerializer uses Java reflection to access an object's fields.
    One of PojoSerializer's constructor calls setAccessible(true) for
    all fields.
    for (int i = 0; i < numFields; i++) {
       this.fields[i].setAccessible(true);
    }
    Then, to my knowledge, it can set a field regardless of the
    field's access control(private, public,..).

    However, its another constructor, called by
    PojoSerializerSnapshot, doesn't call setAccessible(true). Does
    anyone know the reason why setAccessible(true) is not called here?
    And why fields should be public or have a public gettter- and
    setter- method?

    Regards,
    Naehee



Reply via email to