[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817494#comment-17817494 ] Greg Harris commented on KAFKA-13505: - Hi [~twbecker] Thanks for bringing this issue to our attention. There's a couple of reasons this hasn't been addressed: * The ticket has been inactive, and other more active tickets are getting attention from contributors. * There appears to be a (third-party) workaround as described above by [~jcustenborder], and that repository appears to still be maintained and conduct releases. * The SchemaProjector is a utility class which is included in the connect API, but is not used by the framework anywhere, so only some users of some plugins experience this fault. * Enums are not a first-class object in the Connect Schema, and are instead implemented by plugins using so-called "Logical Types", which are a first-class type (e.g. STRING), combined with metadata that the plugins understand. The last one is the most relevant: Connect is doing an "enum-unaware" comparison in the SchemaProjector, because to Connect, the enum is just a strange looking string. Rather than do something incorrect, the SchemaProjector declares that it does not know how to handle the type, and errors out. What is needed is an "enum-aware" schema projector, which can only be implemented by a project that knows what the "enum" is. For Apache Kafka to solve this problem as-described, it would involve adding a special case for this one type, which is not a good solution. An alternative would be to add a first-class ENUM type, but that would present a migration challenge for the existing enum infrastructure. Another alternative is to deprecate and remove this implementation of SchemaProjector, and replace it with a SchemaProjector which is extensible to projecting logical types. This would allow the implementors of the ENUM type (or any higher type) to inform the SchemaProjector class of how it should perform projection. I've opened a ticket here: KAFKA-16257 > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds o
[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817385#comment-17817385 ] Tommy Becker commented on KAFKA-13505: -- I'm wondering if this bug is being overlooked because it's described in terms of third party libraries like Confluent's S3 connector and Avro. But the issue is actually in the SchemaProjector class, which is part of Kafka Connect itself. Confluent's AvroConverter represents Avro enums in Connect Schema as Strings with "parameters" corresponding to each enum value. Even though these parameters seem like they are just intended to be metadata, SchemaProjector.checkMaybeCompatible() requires the parameters of each field to *exactly* match, so when values are added to an Avro enum, new parameters are added to the generated Connect Schema, breaking this test. Intuitively, it feels like this check could be less strict, but there is no specification for Connect Schema that I can find, so I don't know if requiring this parameter equality is correct or not. > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds of errors: > {code:java} > Schema parameters not equal. source parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2} and target parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2, > io.confluent.connect.avro.Enum.value3=value3}{code} > Since Avro 1.10.X specification, enum values support defaults, which makes > schema evolution possible even when adding subjects (values) to an enum. When > testing our schemas for compatibility using the Schema Registry api we always > get "is_compatible" => true. So schema evolution should in theory not be a > problem. > The error above
[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17455964#comment-17455964 ] Jeremy Custenborder commented on KAFKA-13505: - Glad you got it working. Sorry about the delay I totally missed the notification from JIRA. > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds of errors: > {code:java} > Schema parameters not equal. source parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2} and target parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2, > io.confluent.connect.avro.Enum.value3=value3}{code} > Since Avro 1.10.X specification, enum values support defaults, which makes > schema evolution possible even when adding subjects (values) to an enum. When > testing our schemas for compatibility using the Schema Registry api we always > get "is_compatible" => true. So schema evolution should in theory not be a > problem. > The error above is thrown in the *SchemaProjector* class which is part of > Kafka Connect, more specifically in the function > {*}checkMaybeCompatible(){*}. It seems like this function is not respecting > the Avro 1.10.X specification for enum schema evolution, and I'm not sure if > it is meant to respect it? As we currently don't have any other routes to fix > this issue and returning to the "NONE" schema compatibility is no options > considering the file explosions, we're kinda stuck here. > This issue was discussed more in detail on the Confluent forum in this thread: > [https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8] > Adem from Confluent is quite confident this is a bug and asked m
[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454057#comment-17454057 ] Guus De Graeve commented on KAFKA-13505: [~jcustenborder] we got it to work using your transformer! You have no idea how much you helped us out. Thanks a lot. For reference, this is how we fixed it in our connector configs: {code:java} ... "transforms": "NormalizeSchema", "transforms.NormalizeSchema.type":"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Value", ... {code} > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds of errors: > {code:java} > Schema parameters not equal. source parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2} and target parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2, > io.confluent.connect.avro.Enum.value3=value3}{code} > Since Avro 1.10.X specification, enum values support defaults, which makes > schema evolution possible even when adding subjects (values) to an enum. When > testing our schemas for compatibility using the Schema Registry api we always > get "is_compatible" => true. So schema evolution should in theory not be a > problem. > The error above is thrown in the *SchemaProjector* class which is part of > Kafka Connect, more specifically in the function > {*}checkMaybeCompatible(){*}. It seems like this function is not respecting > the Avro 1.10.X specification for enum schema evolution, and I'm not sure if > it is meant to respect it? As we currently don't have any other routes to fix > this issue and returning to the "NONE" schema compatibility is no options > considering the file explosions, we're kinda stuck here. > This
[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453951#comment-17453951 ] Guus De Graeve commented on KAFKA-13505: [~jcustenborder] that would massively help us out, do you have an example on how I can use this transformer in my connector config? Thanks a lot! > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds of errors: > {code:java} > Schema parameters not equal. source parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2} and target parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2, > io.confluent.connect.avro.Enum.value3=value3}{code} > Since Avro 1.10.X specification, enum values support defaults, which makes > schema evolution possible even when adding subjects (values) to an enum. When > testing our schemas for compatibility using the Schema Registry api we always > get "is_compatible" => true. So schema evolution should in theory not be a > problem. > The error above is thrown in the *SchemaProjector* class which is part of > Kafka Connect, more specifically in the function > {*}checkMaybeCompatible(){*}. It seems like this function is not respecting > the Avro 1.10.X specification for enum schema evolution, and I'm not sure if > it is meant to respect it? As we currently don't have any other routes to fix > this issue and returning to the "NONE" schema compatibility is no options > considering the file explosions, we're kinda stuck here. > This issue was discussed more in detail on the Confluent forum in this thread: > [https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8] > Adem from Confluent i
[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453177#comment-17453177 ] Jeremy Custenborder commented on KAFKA-13505: - I was aiming for you to help the current situation you are seeing, given I've seen the exact problem. The Confluent storage connectors check for unique schemas and that's why the files rotate. I had someone I was working with that was having a similar issue. > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds of errors: > {code:java} > Schema parameters not equal. source parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2} and target parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2, > io.confluent.connect.avro.Enum.value3=value3}{code} > Since Avro 1.10.X specification, enum values support defaults, which makes > schema evolution possible even when adding subjects (values) to an enum. When > testing our schemas for compatibility using the Schema Registry api we always > get "is_compatible" => true. So schema evolution should in theory not be a > problem. > The error above is thrown in the *SchemaProjector* class which is part of > Kafka Connect, more specifically in the function > {*}checkMaybeCompatible(){*}. It seems like this function is not respecting > the Avro 1.10.X specification for enum schema evolution, and I'm not sure if > it is meant to respect it? As we currently don't have any other routes to fix > this issue and returning to the "NONE" schema compatibility is no options > considering the file explosions, we're kinda stuck here. > This issue was discussed more in detail on the Confluent forum in this thread: > [https://forum.con
[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453139#comment-17453139 ] Guus De Graeve commented on KAFKA-13505: [~jcustenborder] not sure if this reply was aimed at the developers that would normally work on this issue or at myself? If it was for me, how would I use this normalisation within a Kafka Connect connector? Is this done using the connector config? > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds of errors: > {code:java} > Schema parameters not equal. source parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2} and target parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2, > io.confluent.connect.avro.Enum.value3=value3}{code} > Since Avro 1.10.X specification, enum values support defaults, which makes > schema evolution possible even when adding subjects (values) to an enum. When > testing our schemas for compatibility using the Schema Registry api we always > get "is_compatible" => true. So schema evolution should in theory not be a > problem. > The error above is thrown in the *SchemaProjector* class which is part of > Kafka Connect, more specifically in the function > {*}checkMaybeCompatible(){*}. It seems like this function is not respecting > the Avro 1.10.X specification for enum schema evolution, and I'm not sure if > it is meant to respect it? As we currently don't have any other routes to fix > this issue and returning to the "NONE" schema compatibility is no options > considering the file explosions, we're kinda stuck here. > This issue was discussed more in detail on the Confluent forum in this thread: > [https://forum.confluent.io/t/should-wi
[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453138#comment-17453138 ] Jeremy Custenborder commented on KAFKA-13505: - Not sure if this helps. I ran into a similar issues and put together this transformation to normalize incoming schemas to the latest version. It was specifically to reduce small files because of schema churn. [NormalizeSchema|https://github.com/jcustenborder/kafka-connect-transform-common/blob/master/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/NormalizeSchema.java] > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into +parquet files > on+ S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds of errors: > {code:java} > Schema parameters not equal. source parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2} and target parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2, > io.confluent.connect.avro.Enum.value3=value3}{code} > Since Avro 1.10.X specification, enum values support defaults, which makes > schema evolution possible even when adding subjects (values) to an enum. When > testing our schemas for compatibility using the Schema Registry api we always > get "is_compatible" => true. So schema evolution should in theory not be a > problem. > The error above is thrown in the *SchemaProjector* class which is part of > Kafka Connect, more specifically in the function > {*}checkMaybeCompatible(){*}. It seems like this function is not respecting > the Avro 1.10.X specification for enum schema evolution, and I'm not sure if > it is meant to respect it? As we currently don't have any other routes to fix > this issue and returning to the "NONE" schema compatibility is no options > considering the file e