[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453139#comment-17453139
 ] 

Guus De Graeve commented on KAFKA-13505:
----------------------------------------

[~jcustenborder] not sure if this reply was aimed at the developers that would 
normally work on this issue or at myself? If it was for me, how would I use 
this normalisation within a Kafka Connect connector? Is this done using the 
connector config?

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> -----------------------------------------------------------
>
>                 Key: KAFKA-13505
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13505
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Guus De Graeve
>            Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
>     "name": "hdfs-Project_Test_Subject",
>     "config": {
>         "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
>         "partition.duration.ms": "86400000",
>         "topics.dir": "/user/kafka/Project",
>         "hadoop.conf.dir": "/opt/hadoop/conf",
>         "flush.size": "1000000",
>         "schema.compatibility": "BACKWARD",
>         "topics": "Project_Test_Subject",
>         "timezone": "UTC",
>         "hdfs.url": "hdfs://hadoophost:9000",
>         "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
>         "rotate.interval.ms": "7200000",
>         "locale": "C",
>         "hadoop.home": "/opt/hadoop",
>         "logs.dir": "/user/kafka/_logs",
>         "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
>         "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
>         "name": "hdfs-Project_Test_Subject",
>         "errors.tolerance": "all",
>         "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
>         "path.format": "YYYY/MM/dd"
>     }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file explosions, we're kinda stuck here.
> This issue was discussed more in detail on the Confluent forum in this thread:
> [https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8]
> Adem from Confluent is quite confident this is a bug and asked me to file a 
> bug report here.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to