I guess the best option is to attach a debugger and set a breakpoint at the NotSerializableException. There definitively has to be a non-serializable component in that FlinkKafkaConsumer and it can only come from the DeserializationSchema or Properties. Maybe the consumer internally caches some values generated by your schema at some point but I couldn't think anything obvious. There is a high chance that it comes by your code and only activates on cluster. It would be nice to hear back from you when you have found that respective field. It should be 2 object references deep in FlinkKafkaConsumer (2 writeObject0 before the first writeArray that most likely corresponds to your RecordSchema)
Btw which Avro version are you using? It looks like Avro 1.10.X finally has serializable schema... Maybe this might also explain why it works in one submission and not in the other? On Fri, Aug 27, 2021 at 4:10 PM Kevin Lam <kevin....@shopify.com> wrote: > There's no inner classes, and none of the fields > of DebeziumAvroRegistryDeserializationSchema have an Avro schema, even when > expanded, including KafkaClusterConfig. KafkaClusterConfig is just composed > of Strings and Booleans. > > DebeziumAvroRegistryDeserializationSchema has a field that initializes a > io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient but > this is marked @transient and lazy in Scala, similarly the deserializer > uses that client to initialize a transient+lazy field which builds a > KafkaAvroDeserializer > >>