[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817385#comment-17817385
 ] 

Tommy Becker commented on KAFKA-13505:
--------------------------------------

I'm wondering if this bug is being overlooked because it's described in terms 
of third party libraries like Confluent's S3 connector and Avro. But the issue 
is actually in the SchemaProjector class, which is part of Kafka Connect 
itself. Confluent's AvroConverter represents Avro enums in Connect Schema as 
Strings with "parameters" corresponding to each enum value. Even though these 
parameters seem like they are just intended to be metadata, 
SchemaProjector.checkMaybeCompatible() requires the parameters of each field to 
*exactly* match, so when values are added to an Avro enum, new parameters are 
added to the generated Connect Schema, breaking this test. Intuitively, it 
feels like this check could be less strict, but there is no specification for 
Connect Schema that I can find, so I don't know if requiring this parameter 
equality is correct or not.

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> -----------------------------------------------------------
>
>                 Key: KAFKA-13505
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13505
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>            Reporter: Guus De Graeve
>            Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
>     "name": "hdfs-Project_Test_Subject",
>     "config": {
>         "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
>         "partition.duration.ms": "86400000",
>         "topics.dir": "/user/kafka/Project",
>         "hadoop.conf.dir": "/opt/hadoop/conf",
>         "flush.size": "1000000",
>         "schema.compatibility": "BACKWARD",
>         "topics": "Project_Test_Subject",
>         "timezone": "UTC",
>         "hdfs.url": "hdfs://hadoophost:9000",
>         "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
>         "rotate.interval.ms": "7200000",
>         "locale": "C",
>         "hadoop.home": "/opt/hadoop",
>         "logs.dir": "/user/kafka/_logs",
>         "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
>         "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
>         "name": "hdfs-Project_Test_Subject",
>         "errors.tolerance": "all",
>         "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
>         "path.format": "YYYY/MM/dd"
>     }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file explosions, we're kinda stuck here.
> This issue was discussed more in detail on the Confluent forum in this thread:
> [https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8]
> Adem from Confluent is quite confident this is a bug and asked me to file a 
> bug report here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to