[
https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Randall Hauch resolved KAFKA-6605.
----------------------------------
Resolution: Fixed
Reviewer: Randall Hauch
Fix Version/s: 2.3.1
2.4.0
2.2.2
2.1.2
2.0.2
1.1.2
1.0.3
> Flatten SMT does not properly handle fields that are null
> ---------------------------------------------------------
>
> Key: KAFKA-6605
> URL: https://issues.apache.org/jira/browse/KAFKA-6605
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 1.0.0, 2.0.0
> Reporter: Randall Hauch
> Assignee: Michal Borowiecki
> Priority: Major
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.3.1
>
>
> When a message has a null field, the `Flatten` SMT does not properly handle
> this and throws an NPE. Consider this message from Debezium:
> {code}
> {
> "before": null,
> "after": {
> "dbserver1.mydb.team.Value": {
> "id": 1,
> "name": "kafka",
> "email": "[email protected]",
> "last_modified": 1519939449000
> }
> },
> "source": {
> "version": {
> "string": "0.7.3"
> },
> "name": "dbserver1",
> "server_id": 0,
> "ts_sec": 0,
> "gtid": null,
> "file": "mysql-bin.000003",
> "pos": 154,
> "row": 0,
> "snapshot": {
> "boolean": true
> },
> "thread": null,
> "db": {
> "string": "mydb"
> },
> "table": {
> "string": "team"
> }
> },
> "op": "c",
> "ts_ms": {
> "long": 1519939520285
> }
> }
> {code}
> Note how `before` is null; this event represents a row was INSERTED and thus
> there is no `before` state of the row. This results in an NPE:
> {noformat}
> java.lang.NullPointerException
> at
> org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:219)
> at
> org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:234)
> at
> org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:151)
> at org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:75)
> at
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:211)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> Here's the connector configuration that was used:
> {code}
> {
> "name": "debezium-connector-flatten",
> "config": {
> "connector.class": "io.debezium.connector.mysql.MySqlConnector",
> "tasks.max": "1",
> "database.hostname": "mysql",
> "database.port": "3306",
> "database.user": "debezium",
> "database.password": "dbz",
> "database.server.id": "223345",
> "database.server.name": "dbserver-flatten",
> "database.whitelist": "mydb",
> "database.history.kafka.bootstrap.servers":
> "kafka-1:9092,kafka-2:9092,kafka-3:9092",
> "database.history.kafka.topic": "schema-flatten.mydb",
> "include.schema.changes": "true",
> "transforms": "flatten",
> "transforms.flatten.type":
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "transforms.flatten.delimiter": "_"
> }
> }
> {code}
> Note that the above configuration sets the delimiter to `_`. The default
> delimiter is `.`, which is not a valid character within an Avro field, and
> doing this results in the following exception:
> {noformat}
> org.apache.avro.SchemaParseException: Illegal character in: source.version
> at org.apache.avro.Schema.validateName(Schema.java:1151)
> at org.apache.avro.Schema.access$200(Schema.java:81)
> at org.apache.avro.Schema$Field.<init>(Schema.java:403)
> at
> org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
> at
> org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
> at
> org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
> at
> org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
> at
> io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
> at
> io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
> at
> io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
> at
> io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
> at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
> at
> io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
> at
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This should probably be addressed in the documentation: when using Avro, set
> the delimiter to `_` or another alphanumeric character.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)