Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188386920 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -275,9 +304,19 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { // Utilities // ------------------------------------------------------------------------ + private static boolean isGenericRecord(Class<?> type) { + return !SpecificRecord.class.isAssignableFrom(type) && + GenericRecord.class.isAssignableFrom(type); + } + @Override public TypeSerializer<T> duplicate() { - return new AvroSerializer<>(type); + if (schemaString != null) { + return new AvroSerializer<>(type, new Schema.Parser().parse(schemaString)); --- End diff -- Didn't think it through well. Thought we need to create a deep copy of the schema, but as it is stateless I think we can just pass the schema. My mistake. Correct me if I am wrong.
---