[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2024-02-14 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817494#comment-17817494
 ] 

Greg Harris edited comment on KAFKA-13505 at 2/14/24 7:01 PM:
--

Hi [~twbecker] Thanks for bringing this issue to our attention. There's a 
couple of reasons this hasn't been addressed:
 * The ticket has been inactive, and other more active tickets are getting 
attention from contributors.
 * There appears to be a (third-party) workaround as described above by 
[~jcustenborder], and that repository appears to still be maintained and 
conduct releases.
 * The SchemaProjector is a utility class which is included in the connect API, 
but is not used by the framework anywhere, so only some users of some plugins 
experience this fault.
 * Enums are not a first-class object in the Connect Schema, and are instead 
implemented by plugins using so-called "Logical Types", which are a first-class 
type (e.g. STRING), combined with metadata that the plugins understand.

The last one is the most relevant: Connect is doing an "enum-unaware" 
comparison in the SchemaProjector, because to Connect, the enum is just a 
strange looking string. Rather than do something incorrect, the SchemaProjector 
declares that it does not know how to handle the type, and errors out.
What is needed is an "enum-aware" schema projector, which can only be 
implemented by a project that knows what the "enum" is. For Apache Kafka to 
solve this problem as-described, it would involve adding a special case for 
this one type, which is not a good solution.

An alternative would be to add a first-class ENUM type, but that would present 
a migration challenge for the existing enum infrastructure. Another alternative 
is to make SchemaProjector extensible to projecting logical types. This would 
allow the implementors of the ENUM type (or any higher type) to inform the 
SchemaProjector class of how it should perform projection. I've opened a ticket 
here: KAFKA-16257


was (Author: gharris1727):
Hi [~twbecker] Thanks for bringing this issue to our attention. There's a 
couple of reasons this hasn't been addressed:

* The ticket has been inactive, and other more active tickets are getting 
attention from contributors.
* There appears to be a (third-party) workaround as described above by 
[~jcustenborder], and that repository appears to still be maintained and 
conduct releases.
* The SchemaProjector is a utility class which is included in the connect API, 
but is not used by the framework anywhere, so only some users of some plugins 
experience this fault.
* Enums are not a first-class object in the Connect Schema, and are instead 
implemented by plugins using so-called "Logical Types", which are a first-class 
type (e.g. STRING), combined with metadata that the plugins understand.

The last one is the most relevant: Connect is doing an "enum-unaware" 
comparison in the SchemaProjector, because to Connect, the enum is just a 
strange looking string. Rather than do something incorrect, the SchemaProjector 
declares that it does not know how to handle the type, and errors out.
What is needed is an "enum-aware" schema projector, which can only be 
implemented by a project that knows what the "enum" is. For Apache Kafka to 
solve this problem as-described, it would involve adding a special case for 
this one type, which is not a good solution.

An alternative would be to add a first-class ENUM type, but that would present 
a migration challenge for the existing enum infrastructure. Another alternative 
is to deprecate and remove this implementation of SchemaProjector, and replace 
it with a SchemaProjector which is extensible to projecting logical types. This 
would allow the implementors of the ENUM type (or any higher type) to inform 
the SchemaProjector class of how it should perform projection. I've opened a 
ticket here: KAFKA-16257

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up 

[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-06 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17453951#comment-17453951
 ] 

Guus De Graeve edited comment on KAFKA-13505 at 12/6/21, 11:54 AM:
---

[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

Would it be something like below?
{code:java}
...
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type": 
"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema",
...
{code}


was (Author: JIRAUSER281185):
[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

Would it be something like below?
{code:java}
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type": 
"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema",{code}

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically 

[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-06 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17453951#comment-17453951
 ] 

Guus De Graeve edited comment on KAFKA-13505 at 12/6/21, 11:53 AM:
---

[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

Would it be something like below?
{code:java}
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type": 
"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema",{code}


was (Author: JIRAUSER281185):
[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

Would it be something like this:
{code:java}
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type": 
"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema",{code}

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the 

[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-06 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17453951#comment-17453951
 ] 

Guus De Graeve edited comment on KAFKA-13505 at 12/6/21, 11:50 AM:
---

[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

Would it be something like this:
{code:java}
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type": 
"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema",{code}


was (Author: JIRAUSER281185):
[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As 

[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-03 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17453139#comment-17453139
 ] 

Guus De Graeve edited comment on KAFKA-13505 at 12/3/21, 4:58 PM:
--

[~jcustenborder] not sure if this reply was aimed at the developers that would 
normally work on this issue or at myself? If it was for me, how would I use 
this normalisation within a Kafka Connect connector? Is this done using the 
connector config?

Thanks a lot for your time!


was (Author: JIRAUSER281185):
[~jcustenborder] not sure if this reply was aimed at the developers that would 
normally work on this issue or at myself? If it was for me, how would I use 
this normalisation within a Kafka Connect connector? Is this done using the 
connector config?

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not