[ https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch updated KAFKA-6605: --------------------------------- Description: When a message has a null field, the `Flatten` SMT does not properly handle this and throws an NPE. Consider this message from Debezium: {code} { "before": null, "after": { "dbserver1.mydb.team.Value": { "id": 1, "name": "kafka", "email": "ka...@apache.org", "last_modified": 1519939449000 } }, "source": { "version": { "string": "0.7.3" }, "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": { "boolean": true }, "thread": null, "db": { "string": "mydb" }, "table": { "string": "team" } }, "op": "c", "ts_ms": { "long": 1519939520285 } } {code} Note how `before` is null; this event represents a row was INSERTED and thus there is no `before` state of the row. This results in an NPE: {noformat} org.apache.avro.SchemaParseException: Illegal character in: source.version at org.apache.avro.Schema.validateName(Schema.java:1151) at org.apache.avro.Schema.access$200(Schema.java:81) at org.apache.avro.Schema$Field.<init>(Schema.java:403) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) at org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) at org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {noformat} Here's the connector configuration that was used: {code} { "name": "debezium-connector-flatten", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "223345", "database.server.name": "dbserver-flatten", "database.whitelist": "mydb", "database.history.kafka.bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092", "database.history.kafka.topic": "schema-flatten.mydb", "include.schema.changes": "true", "transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter": "_" } } {code} was: When a message has a null field, the `Flatten` SMT does not properly handle this and throws an NPE. Consider this message from Debezium: {code} { "before": null, "after": { "dbserver1.mydb.team.Value": { "id": 1, "name": "kafka", "email": "ka...@apache.org", "last_modified": 1519939449000 } }, "source": { "version": { "string": "0.7.3" }, "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": { "boolean": true }, "thread": null, "db": { "string": "mydb" }, "table": { "string": "team" } }, "op": "c", "ts_ms": { "long": 1519939520285 } } {code} Note how `before` is null; this event represents a row was INSERTED and thus there is no `before` state of the row. This results in an NPE: {noformat} org.apache.avro.SchemaParseException: Illegal character in: source.version at org.apache.avro.Schema.validateName(Schema.java:1151) at org.apache.avro.Schema.access$200(Schema.java:81) at org.apache.avro.Schema$Field.<init>(Schema.java:403) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) at org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) at org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {noformat} Here's the connector configuration that was used: {code} { "name": "debezium-connector-flatten", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "223345", "database.server.name": "dbserver-flatten", "database.whitelist": "mydb", "database.history.kafka.bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092", "database.history.kafka.topic": "schema-flatten.mydb", "include.schema.changes": "true", "transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value" } } {code} > Flatten SMT does not properly handle fields that are null > --------------------------------------------------------- > > Key: KAFKA-6605 > URL: https://issues.apache.org/jira/browse/KAFKA-6605 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 1.0.0 > Reporter: Randall Hauch > Priority: Major > > When a message has a null field, the `Flatten` SMT does not properly handle > this and throws an NPE. Consider this message from Debezium: > {code} > { > "before": null, > "after": { > "dbserver1.mydb.team.Value": { > "id": 1, > "name": "kafka", > "email": "ka...@apache.org", > "last_modified": 1519939449000 > } > }, > "source": { > "version": { > "string": "0.7.3" > }, > "name": "dbserver1", > "server_id": 0, > "ts_sec": 0, > "gtid": null, > "file": "mysql-bin.000003", > "pos": 154, > "row": 0, > "snapshot": { > "boolean": true > }, > "thread": null, > "db": { > "string": "mydb" > }, > "table": { > "string": "team" > } > }, > "op": "c", > "ts_ms": { > "long": 1519939520285 > } > } > {code} > Note how `before` is null; this event represents a row was INSERTED and thus > there is no `before` state of the row. This results in an NPE: > {noformat} > org.apache.avro.SchemaParseException: Illegal character in: source.version > at org.apache.avro.Schema.validateName(Schema.java:1151) > at org.apache.avro.Schema.access$200(Schema.java:81) > at org.apache.avro.Schema$Field.<init>(Schema.java:403) > at > org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) > at > org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) > at > org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) > at > org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) > at > io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) > at > io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) > at > io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652) > at > io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647) > at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324) > at > io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > Here's the connector configuration that was used: > {code} > { > "name": "debezium-connector-flatten", > "config": { > "connector.class": "io.debezium.connector.mysql.MySqlConnector", > "tasks.max": "1", > "database.hostname": "mysql", > "database.port": "3306", > "database.user": "debezium", > "database.password": "dbz", > "database.server.id": "223345", > "database.server.name": "dbserver-flatten", > "database.whitelist": "mydb", > "database.history.kafka.bootstrap.servers": > "kafka-1:9092,kafka-2:9092,kafka-3:9092", > "database.history.kafka.topic": "schema-flatten.mydb", > "include.schema.changes": "true", > "transforms": "flatten", > "transforms.flatten.type": > "org.apache.kafka.connect.transforms.Flatten$Value", > "transforms.flatten.delimiter": "_" > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)