[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2024-02-14 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817494#comment-17817494
 ] 

Greg Harris commented on KAFKA-13505:
-

Hi [~twbecker] Thanks for bringing this issue to our attention. There's a 
couple of reasons this hasn't been addressed:

* The ticket has been inactive, and other more active tickets are getting 
attention from contributors.
* There appears to be a (third-party) workaround as described above by 
[~jcustenborder], and that repository appears to still be maintained and 
conduct releases.
* The SchemaProjector is a utility class which is included in the connect API, 
but is not used by the framework anywhere, so only some users of some plugins 
experience this fault.
* Enums are not a first-class object in the Connect Schema, and are instead 
implemented by plugins using so-called "Logical Types", which are a first-class 
type (e.g. STRING), combined with metadata that the plugins understand.

The last one is the most relevant: Connect is doing an "enum-unaware" 
comparison in the SchemaProjector, because to Connect, the enum is just a 
strange looking string. Rather than do something incorrect, the SchemaProjector 
declares that it does not know how to handle the type, and errors out.
What is needed is an "enum-aware" schema projector, which can only be 
implemented by a project that knows what the "enum" is. For Apache Kafka to 
solve this problem as-described, it would involve adding a special case for 
this one type, which is not a good solution.

An alternative would be to add a first-class ENUM type, but that would present 
a migration challenge for the existing enum infrastructure. Another alternative 
is to deprecate and remove this implementation of SchemaProjector, and replace 
it with a SchemaProjector which is extensible to projecting logical types. This 
would allow the implementors of the ENUM type (or any higher type) to inform 
the SchemaProjector class of how it should perform projection. I've opened a 
ticket here: KAFKA-16257

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds o

[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2024-02-14 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817385#comment-17817385
 ] 

Tommy Becker commented on KAFKA-13505:
--

I'm wondering if this bug is being overlooked because it's described in terms 
of third party libraries like Confluent's S3 connector and Avro. But the issue 
is actually in the SchemaProjector class, which is part of Kafka Connect 
itself. Confluent's AvroConverter represents Avro enums in Connect Schema as 
Strings with "parameters" corresponding to each enum value. Even though these 
parameters seem like they are just intended to be metadata, 
SchemaProjector.checkMaybeCompatible() requires the parameters of each field to 
*exactly* match, so when values are added to an Avro enum, new parameters are 
added to the generated Connect Schema, breaking this test. Intuitively, it 
feels like this check could be less strict, but there is no specification for 
Connect Schema that I can find, so I don't know if requiring this parameter 
equality is correct or not.

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above

[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-08 Thread Jeremy Custenborder (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17455964#comment-17455964
 ] 

Jeremy Custenborder commented on KAFKA-13505:
-

Glad you got it working. Sorry about the delay I totally missed the 
notification from JIRA. 

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file explosions, we're kinda stuck here.
> This issue was discussed more in detail on the Confluent forum in this thread:
> [https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8]
> Adem from Confluent is quite confident this is a bug and asked m

[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-06 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454057#comment-17454057
 ] 

Guus De Graeve commented on KAFKA-13505:


[~jcustenborder] we got it to work using your transformer! You have no idea how 
much you helped us out. Thanks a lot.

For reference, this is how we fixed it in our connector configs:

 
{code:java}
...

"transforms": "NormalizeSchema", 
"transforms.NormalizeSchema.type":"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Value",
 ... {code}
 

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file explosions, we're kinda stuck here.
> This

[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-06 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453951#comment-17453951
 ] 

Guus De Graeve commented on KAFKA-13505:


[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file explosions, we're kinda stuck here.
> This issue was discussed more in detail on the Confluent forum in this thread:
> [https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8]
> Adem from Confluent i

[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-03 Thread Jeremy Custenborder (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453177#comment-17453177
 ] 

Jeremy Custenborder commented on KAFKA-13505:
-

I was aiming for you to help the current situation you are seeing, given I've 
seen the exact problem. The Confluent storage connectors check for unique 
schemas and that's why the files rotate. I had someone I was working with that 
was having a similar issue. 

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file explosions, we're kinda stuck here.
> This issue was discussed more in detail on the Confluent forum in this thread:
> [https://forum.con

[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-03 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453139#comment-17453139
 ] 

Guus De Graeve commented on KAFKA-13505:


[~jcustenborder] not sure if this reply was aimed at the developers that would 
normally work on this issue or at myself? If it was for me, how would I use 
this normalisation within a Kafka Connect connector? Is this done using the 
connector config?

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file explosions, we're kinda stuck here.
> This issue was discussed more in detail on the Confluent forum in this thread:
> [https://forum.confluent.io/t/should-wi

[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-03 Thread Jeremy Custenborder (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453138#comment-17453138
 ] 

Jeremy Custenborder commented on KAFKA-13505:
-

Not sure if this helps. I ran into a similar issues and put together this 
transformation to normalize incoming schemas to the latest version. It was 
specifically to reduce small files because of schema churn. 
[NormalizeSchema|https://github.com/jcustenborder/kafka-connect-transform-common/blob/master/src/main/java/com/github/jcustenborder/kafka/connect/transform/common/NormalizeSchema.java]

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into +parquet files 
> on+ S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file e