[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-06 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454057#comment-17454057
 ] 

Guus De Graeve commented on KAFKA-13505:


[~jcustenborder] we got it to work using your transformer! You have no idea how 
much you helped us out. Thanks a lot.

For reference, this is how we fixed it in our connector configs:

 
{code:java}
...

"transforms": "NormalizeSchema", 
"transforms.NormalizeSchema.type":"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Value",
 ... {code}
 

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file explosions, we're kinda stuck here.
> This

[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-06 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453951#comment-17453951
 ] 

Guus De Graeve edited comment on KAFKA-13505 at 12/6/21, 11:54 AM:
---

[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

Would it be something like below?
{code:java}
...
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type": 
"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema",
...
{code}


was (Author: JIRAUSER281185):
[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

Would it be something like below?
{code:java}
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type": 
"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema",{code}

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, mor

[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-06 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453951#comment-17453951
 ] 

Guus De Graeve edited comment on KAFKA-13505 at 12/6/21, 11:53 AM:
---

[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

Would it be something like below?
{code:java}
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type": 
"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema",{code}


was (Author: JIRAUSER281185):
[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

Would it be something like this:
{code:java}
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type": 
"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema",{code}

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specific

[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-06 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453951#comment-17453951
 ] 

Guus De Graeve edited comment on KAFKA-13505 at 12/6/21, 11:50 AM:
---

[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

Would it be something like this:
{code:java}
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type": 
"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema",{code}


was (Author: JIRAUSER281185):
[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to

[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-06 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453951#comment-17453951
 ] 

Guus De Graeve commented on KAFKA-13505:


[~jcustenborder] that would massively help us out, do you have an example on 
how I can use this transformer in my connector config? Thanks a lot!

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file explosions, we're kinda stuck here.
> This issue was discussed more in detail on the Confluent forum in this thread:
> [https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8]
> Adem from Confluent i

[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-03 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453139#comment-17453139
 ] 

Guus De Graeve edited comment on KAFKA-13505 at 12/3/21, 4:58 PM:
--

[~jcustenborder] not sure if this reply was aimed at the developers that would 
normally work on this issue or at myself? If it was for me, how would I use 
this normalisation within a Kafka Connect connector? Is this done using the 
connector config?

Thanks a lot for your time!


was (Author: JIRAUSER281185):
[~jcustenborder] not sure if this reply was aimed at the developers that would 
normally work on this issue or at myself? If it was for me, how would I use 
this normalisation within a Kafka Connect connector? Is this done using the 
connector config?

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evoluti

[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-03 Thread Guus De Graeve (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453139#comment-17453139
 ] 

Guus De Graeve commented on KAFKA-13505:


[~jcustenborder] not sure if this reply was aimed at the developers that would 
normally work on this issue or at myself? If it was for me, how would I use 
this normalisation within a Kafka Connect connector? Is this done using the 
connector config?

> Kafka Connect should respect Avro 1.10.X enum defaults spec
> ---
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Guus De Graeve
>Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files 
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the 
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our 
> connectors, but this had the pain-full side-effect that during deploys of our 
> application we got huge file explosions (lots of very small files in HDFS / 
> S3). This happens because kafka connect will create a new file every time the 
> schema id of a log changes compared to the previous log. During deploys of 
> our applications (which can take up to 20 minutes) multiple logs of mixed 
> schema ids are inevitable and given the huge amounts of logs file explosions 
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility" 
> to "BACKWARD", which should only create a new file when a higher schema id is 
> detected and deserialise all logs with the latest known schema id. Which 
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "8640",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "100",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy": 
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "720",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to 
> which subjects get added frequently, and this is causing issues with our 
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
> {io.confluent.connect.avro.enum.default.testfield=null, 
> io.confluent.connect.avro.Enum=Ablo.testfield, 
> io.confluent.connect.avro.Enum.null=null, 
> io.confluent.connect.avro.Enum.value1=value1, 
> io.confluent.connect.avro.Enum.value2=value2, 
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes 
> schema evolution possible even when adding subjects (values) to an enum. When 
> testing our schemas for compatibility using the Schema Registry api we always 
> get "is_compatible" => true. So schema evolution should in theory not be a 
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of 
> Kafka Connect, more specifically in the function 
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting 
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if 
> it is meant to respect it? As we currently don't have any other routes to fix 
> this issue and returning to the "NONE" schema compatibility is no options 
> considering the file explosions, we're kinda stuck here.
> This issue was discussed more in detail on the Confluent forum in this thread:
> [https://forum.confluent.io/t/should-wi

[jira] [Updated] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-03 Thread Guus De Graeve (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guus De Graeve updated KAFKA-13505:
---
Description: 
We are using Kafka Connect to pipe data from Kafka topics into parquet files on 
S3. Our Kafka data is serialised using Avro (schema registry). We use the 
Amazon S3 Sink Connector for this.

Up until recently we would set "schema.compatibility" to "NONE" in our 
connectors, but this had the pain-full side-effect that during deploys of our 
application we got huge file explosions (lots of very small files in HDFS / 
S3). This happens because kafka connect will create a new file every time the 
schema id of a log changes compared to the previous log. During deploys of our 
applications (which can take up to 20 minutes) multiple logs of mixed schema 
ids are inevitable and given the huge amounts of logs file explosions of up to 
a million files weren't uncommon.

To solve this problem we switched all our connectors "schema.compatibility" to 
"BACKWARD", which should only create a new file when a higher schema id is 
detected and deserialise all logs with the latest known schema id. Which should 
only create one new file during deploys.

An example connector config:
{code:java}
{
"name": "hdfs-Project_Test_Subject",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"partition.duration.ms": "8640",
"topics.dir": "/user/kafka/Project",
"hadoop.conf.dir": "/opt/hadoop/conf",
"flush.size": "100",
"schema.compatibility": "BACKWARD",
"topics": "Project_Test_Subject",
"timezone": "UTC",
"hdfs.url": "hdfs://hadoophost:9000",
"value.converter.value.subject.name.strategy": 
"io.confluent.kafka.serializers.subject.TopicNameStrategy",
"rotate.interval.ms": "720",
"locale": "C",
"hadoop.home": "/opt/hadoop",
"logs.dir": "/user/kafka/_logs",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class": 
"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"name": "hdfs-Project_Test_Subject",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
"path.format": "/MM/dd"
}
}{code}
However, we have lots of enum fields in our data records (avro schemas) to 
which subjects get added frequently, and this is causing issues with our Kafka 
Connect connectors FAILING with these kinds of errors:
{code:java}
Schema parameters not equal. source parameters: 
{io.confluent.connect.avro.enum.default.testfield=null, 
io.confluent.connect.avro.Enum=Ablo.testfield, 
io.confluent.connect.avro.Enum.null=null, 
io.confluent.connect.avro.Enum.value1=value1, 
io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
{io.confluent.connect.avro.enum.default.testfield=null, 
io.confluent.connect.avro.Enum=Ablo.testfield, 
io.confluent.connect.avro.Enum.null=null, 
io.confluent.connect.avro.Enum.value1=value1, 
io.confluent.connect.avro.Enum.value2=value2, 
io.confluent.connect.avro.Enum.value3=value3}{code}
Since Avro 1.10.X specification, enum values support defaults, which makes 
schema evolution possible even when adding subjects (values) to an enum. When 
testing our schemas for compatibility using the Schema Registry api we always 
get "is_compatible" => true. So schema evolution should in theory not be a 
problem.

The error above is thrown in the *SchemaProjector* class which is part of Kafka 
Connect, more specifically in the function {*}checkMaybeCompatible(){*}. It 
seems like this function is not respecting the Avro 1.10.X specification for 
enum schema evolution, and I'm not sure if it is meant to respect it? As we 
currently don't have any other routes to fix this issue and returning to the 
"NONE" schema compatibility is no options considering the file explosions, 
we're kinda stuck here.

This issue was discussed more in detail on the Confluent forum in this thread:

[https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8]

Adem from Confluent is quite confident this is a bug and asked me to file a bug 
report here.

  was:
We are using Kafka Connect to pipe data from Kafka topics into +parquet files 
on+ S3. Our Kafka data is serialised using Avro (schema registry). We use the 
Amazon S3 Sink Connector for this.

Up until recently we would set "schema.compatibility" to "NONE" in our 
connectors, but this had the pain-full side-effect that during deploys of our 
application we got huge file explosions (lots of very small files in HDFS / 
S3). This happens because kafka connect will create a new file every time the 
schema id of a log changes compared to the previous log. During deploys of our 
applications (which can take up to 20 minutes) multiple logs of 

[jira] [Updated] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-03 Thread Guus De Graeve (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guus De Graeve updated KAFKA-13505:
---
Description: 
We are using Kafka Connect to pipe data from Kafka topics into +parquet files 
on+ S3. Our Kafka data is serialised using Avro (schema registry). We use the 
Amazon S3 Sink Connector for this.

Up until recently we would set "schema.compatibility" to "NONE" in our 
connectors, but this had the pain-full side-effect that during deploys of our 
application we got huge file explosions (lots of very small files in HDFS / 
S3). This happens because kafka connect will create a new file every time the 
schema id of a log changes compared to the previous log. During deploys of our 
applications (which can take up to 20 minutes) multiple logs of mixed schema 
ids are inevitable and given the huge amounts of logs file explosions of up to 
a million files weren't uncommon.

To solve this problem we switched all our connectors "schema.compatibility" to 
"BACKWARD", which should only create a new file when a higher schema id is 
detected and deserialise all logs with the latest known schema id. Which should 
only create one new file during deploys.

An example connector config:
{code:java}
{
"name": "hdfs-Project_Test_Subject",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"partition.duration.ms": "8640",
"topics.dir": "/user/kafka/Project",
"hadoop.conf.dir": "/opt/hadoop/conf",
"flush.size": "100",
"schema.compatibility": "BACKWARD",
"topics": "Project_Test_Subject",
"timezone": "UTC",
"hdfs.url": "hdfs://hadoophost:9000",
"value.converter.value.subject.name.strategy": 
"io.confluent.kafka.serializers.subject.TopicNameStrategy",
"rotate.interval.ms": "720",
"locale": "C",
"hadoop.home": "/opt/hadoop",
"logs.dir": "/user/kafka/_logs",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class": 
"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"name": "hdfs-Project_Test_Subject",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
"path.format": "/MM/dd"
}
}{code}
However, we have lots of enum fields in our data records (avro schemas) to 
which subjects get added frequently, and this is causing issues with our Kafka 
Connect connectors FAILING with these kinds of errors:
{code:java}
Schema parameters not equal. source parameters: 
{io.confluent.connect.avro.enum.default.testfield=null, 
io.confluent.connect.avro.Enum=Ablo.testfield, 
io.confluent.connect.avro.Enum.null=null, 
io.confluent.connect.avro.Enum.value1=value1, 
io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
{io.confluent.connect.avro.enum.default.testfield=null, 
io.confluent.connect.avro.Enum=Ablo.testfield, 
io.confluent.connect.avro.Enum.null=null, 
io.confluent.connect.avro.Enum.value1=value1, 
io.confluent.connect.avro.Enum.value2=value2, 
io.confluent.connect.avro.Enum.value3=value3}{code}
Since Avro 1.10.X specification, enum values support defaults, which makes 
schema evolution possible even when adding subjects (values) to an enum. When 
testing our schemas for compatibility using the Schema Registry api we always 
get "is_compatible" => true. So schema evolution should in theory not be a 
problem.

The error above is thrown in the *SchemaProjector* class which is part of Kafka 
Connect, more specifically in the function {*}checkMaybeCompatible(){*}. It 
seems like this function is not respecting the Avro 1.10.X specification for 
enum schema evolution, and I'm not sure if it is meant to respect it? As we 
currently don't have any other routes to fix this issue and returning to the 
"NONE" schema compatibility is no options considering the file explosions, 
we're kinda stuck here.

This issue was discussed more in detail on the Confluent forum in this thread:

[https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8]

Adem from Confluent is quite confident this is a bug and asked me to file a bug 
report here.

  was:
We are using Kafka Connect to pipe data from Kafka topics into +parquet files 
on+ S3. Our Kafka data is serialised using Avro (schema registry). We use the 
Amazon S3 Sink Connector for this.

Up until recently we would set "schema.compatibility" to "NONE" in our 
connectors, but this had the pain-full side-effect that during deploys of our 
application we got huge file explosions (lots of very small files in HDFS / 
S3). This happens because kafka connect will create a new file every time the 
schema id of a log changes compared to the previous log. During deploys of our 
applications (which can take up to 20 minutes) multiple logs o

[jira] [Created] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec

2021-12-03 Thread Guus De Graeve (Jira)
Guus De Graeve created KAFKA-13505:
--

 Summary: Kafka Connect should respect Avro 1.10.X enum defaults 
spec
 Key: KAFKA-13505
 URL: https://issues.apache.org/jira/browse/KAFKA-13505
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Guus De Graeve


We are using Kafka Connect to pipe data from Kafka topics into +parquet files 
on+ S3. Our Kafka data is serialised using Avro (schema registry). We use the 
Amazon S3 Sink Connector for this.

Up until recently we would set "schema.compatibility" to "NONE" in our 
connectors, but this had the pain-full side-effect that during deploys of our 
application we got huge file explosions (lots of very small files in HDFS / 
S3). This happens because kafka connect will create a new file every time the 
schema id of a log changes compared to the previous log. During deploys of our 
applications (which can take up to 20 minutes) multiple logs of mixed schema 
ids are inevitable and given the huge amounts of logs file explosions of up to 
a million files weren't uncommon.

To solve this problem we switched all our connectors "schema.compatibility" to 
"BACKWARD", which should only create a new file when a higher schema id is 
detected and deserialise all logs with the latest known schema id. Which should 
only create one new file during deploys.

An example connector config:
{code:java}
{
"name": "hdfs-Project_Test_Subject",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"partition.duration.ms": "8640",
"topics.dir": "/user/kafka/Project",
"hadoop.conf.dir": "/opt/hadoop/conf",
"flush.size": "100",
"schema.compatibility": "BACKWARD",
"topics": "Project_Test_Subject",
"timezone": "UTC",
"hdfs.url": "hdfs://hadoophost:9000",
"value.converter.value.subject.name.strategy": 
"io.confluent.kafka.serializers.subject.TopicNameStrategy",
"rotate.interval.ms": "720",
"locale": "C",
"hadoop.home": "/opt/hadoop",
"logs.dir": "/user/kafka/_logs",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class": 
"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"name": "hdfs-Project_Test_Subject",
"errors.tolerance": "all",
"storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
"path.format": "/MM/dd"
}
}{code}
However, we have lots of enum fields in our data records (avro schemas) to 
which subjects get added frequently, and this is causing issues with our Kafka 
Connect connectors FAILING with these kinds of errors:
{code:java}
Schema parameters not equal. source parameters: 
{io.confluent.connect.avro.enum.default.testfield=null, 
io.confluent.connect.avro.Enum=Ablo.testfield, 
io.confluent.connect.avro.Enum.null=null, 
io.confluent.connect.avro.Enum.value1=value1, 
io.confluent.connect.avro.Enum.value2=value2} and target parameters: 
{io.confluent.connect.avro.enum.default.testfield=null, 
io.confluent.connect.avro.Enum=Ablo.testfield, 
io.confluent.connect.avro.Enum.null=null, 
io.confluent.connect.avro.Enum.value1=value1, 
io.confluent.connect.avro.Enum.value2=value2, 
io.confluent.connect.avro.Enum.value3=value3}{code}
Since Avro 1.10.X specification, enum values support defaults, which makes 
schema evolution possible even when adding subjects (values) to an enum. When 
testing our schemas for compatibility using the Schema Registry api we always 
get "is_compatible" => true. So schema evolution should in theory not be a 
problem.

The error above is thrown in the `SchemaProjector` class which is part of Kafka 
Connect, more specifically in the function `checkMaybeCompatible()`. It seems 
like this function is not respecting the Avro 1.10.X specification for enum 
schema evolution, and I'm not sure if it is meant to respect it? As we 
currently don't have any other routes to fix this issue and returning to the 
"NONE" schema compatibility is no options considering the file explosions, 
we're kinda stuck here.

This issue was discussed more in detail on the Confluent forum in this thread:

[https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8]

Adem from Confluent is quite confident this is a bug and asked me to file a bug 
report here.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)