Hi CZ,

The issue here is that the Scala DataStream API uses Scala macros to decide
the serializer to be used. Since that recognizes Scala case classes, the
CaseClassSerializer will be used.
However, in the State Processor API, those Scala macros do not come into
play, and therefore it directly goes to Flink's type extraction for Java
classes, which recognizes this as a Avro generated class.
In general, currently the State Processor API doesn't support savepoints
written by Scala DataStream jobs that well.

You can try using TypeInfo annotations to specify a TypeInformationFactory
for your key class [1].
This allows you to "plug-in" the TypeInformation extracted by Flink for a
given class. In that custom TypeInformation, you should let it return the
correct serializer.

Best,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory

On Mon, Mar 29, 2021 at 2:42 PM ChangZhuo Chen (陳昌倬) <czc...@czchen.org>
wrote:

> Hi,
>
> Currently we use sbt-avrohugger [0] to generate key class for keyed
> state.  The key class generated by sbt-avrohugger is both case class,
> and AVRO specific record. However, in the following scenarons, Flink
> uses different serializers:
>
>
> * In streaming application, Flink uses CaseClassSerializer for key
>   class.
> * In state processor API application, Flink uses AvroSerializer for key
>   class.
>
>
> Since they use different serializers for key, they are not compatible.
> Is there any way to specific key serializer so that both applications
> use the same serializer?
>
>
>
> [0] https://github.com/julianpeeters/sbt-avrohugger
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>

Reply via email to