I guess the best option is to attach a debugger and set a breakpoint at the
NotSerializableException. There definitively has to be a non-serializable
component in that FlinkKafkaConsumer and it can only come from the
DeserializationSchema or Properties.
Maybe the consumer internally caches some values generated by your schema
at some point but I couldn't think anything obvious. There is a high chance
that it comes by your code and only activates on cluster.
It would be nice to hear back from you when you have found that respective
field. It should be 2 object references deep in FlinkKafkaConsumer (2
writeObject0 before the first writeArray that most likely corresponds to
your RecordSchema)

Btw which Avro version are you using? It looks like Avro 1.10.X finally has
serializable schema... Maybe this might also explain why it works in one
submission and not in the other?

On Fri, Aug 27, 2021 at 4:10 PM Kevin Lam <kevin....@shopify.com> wrote:

> There's no inner classes, and none of the fields
> of DebeziumAvroRegistryDeserializationSchema have an Avro schema, even when
> expanded, including KafkaClusterConfig. KafkaClusterConfig is just composed
> of Strings and Booleans.
>
> DebeziumAvroRegistryDeserializationSchema has a field that initializes a
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient but
> this is marked @transient and lazy in Scala, similarly the deserializer
> uses that client to initialize a transient+lazy field which builds a
> KafkaAvroDeserializer
>
>>

Reply via email to