[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454057#comment-17454057 ]
Guus De Graeve commented on KAFKA-13505: ---------------------------------------- [~jcustenborder] we got it to work using your transformer! You have no idea how much you helped us out. Thanks a lot. For reference, this is how we fixed it in our connector configs: {code:java} ... "transforms": "NormalizeSchema", "transforms.NormalizeSchema.type":"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Value", ... {code} > Kafka Connect should respect Avro 1.10.X enum defaults spec > ----------------------------------------------------------- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Guus De Graeve > Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "86400000", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "1000000", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "7200000", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "YYYY/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds of errors: > {code:java} > Schema parameters not equal. source parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2} and target parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2, > io.confluent.connect.avro.Enum.value3=value3}{code} > Since Avro 1.10.X specification, enum values support defaults, which makes > schema evolution possible even when adding subjects (values) to an enum. When > testing our schemas for compatibility using the Schema Registry api we always > get "is_compatible" => true. So schema evolution should in theory not be a > problem. > The error above is thrown in the *SchemaProjector* class which is part of > Kafka Connect, more specifically in the function > {*}checkMaybeCompatible(){*}. It seems like this function is not respecting > the Avro 1.10.X specification for enum schema evolution, and I'm not sure if > it is meant to respect it? As we currently don't have any other routes to fix > this issue and returning to the "NONE" schema compatibility is no options > considering the file explosions, we're kinda stuck here. > This issue was discussed more in detail on the Confluent forum in this thread: > [https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8] > Adem from Confluent is quite confident this is a bug and asked me to file a > bug report here. -- This message was sent by Atlassian Jira (v8.20.1#820001)