[ 
https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6605:
---------------------------------
    Description: 
When a message has a null field, the `Flatten` SMT does not properly handle 
this and throws an NPE. Consider this message from Debezium:

{code}
{
  "before": null,
  "after": {
    "dbserver1.mydb.team.Value": {
      "id": 1,
      "name": "kafka",
      "email": "ka...@apache.org",
      "last_modified": 1519939449000
    }
  },
  "source": {
    "version": {
      "string": "0.7.3"
    },
    "name": "dbserver1",
    "server_id": 0,
    "ts_sec": 0,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 154,
    "row": 0,
    "snapshot": {
      "boolean": true
    },
    "thread": null,
    "db": {
      "string": "mydb"
    },
    "table": {
      "string": "team"
    }
  },
  "op": "c",
  "ts_ms": {
    "long": 1519939520285
  }
}
{code}

Note how `before` is null; this event represents a row was INSERTED and thus 
there is no `before` state of the row. This results in an NPE:

{noformat}
org.apache.avro.SchemaParseException: Illegal character in: source.version
        at org.apache.avro.Schema.validateName(Schema.java:1151)
        at org.apache.avro.Schema.access$200(Schema.java:81)
        at org.apache.avro.Schema$Field.<init>(Schema.java:403)
        at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
        at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
        at 
org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
        at 
org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
        at 
io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
        at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
        at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
        at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
        at 
io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

Here's the connector configuration that was used:

{code}
{
    "name": "debezium-connector-flatten",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "223345",
        "database.server.name": "dbserver-flatten",
        "database.whitelist": "mydb",
        "database.history.kafka.bootstrap.servers": 
"kafka-1:9092,kafka-2:9092,kafka-3:9092",
        "database.history.kafka.topic": "schema-flatten.mydb",
        "include.schema.changes": "true",
        "transforms": "flatten",
        "transforms.flatten.type": 
"org.apache.kafka.connect.transforms.Flatten$Value",
        "transforms.flatten.delimiter": "_"
      }
}
{code}

Note that the above configuration sets the delimiter to `_`. The default 
delimiter is `.`, which is not a valid character within an Avro field, and 
doing this results in the following exception:

{noformat}
org.apache.avro.SchemaParseException: Illegal character in: source.version
        at org.apache.avro.Schema.validateName(Schema.java:1151)
        at org.apache.avro.Schema.access$200(Schema.java:81)
        at org.apache.avro.Schema$Field.<init>(Schema.java:403)
        at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
        at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
        at 
org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
        at 
org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
        at 
io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
        at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
        at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
        at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
        at 
io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

This should probably be addressed in the documentation: when using Avro, set 
the delimiter to `_` or another alphanumeric character.

  was:
When a message has a null field, the `Flatten` SMT does not properly handle 
this and throws an NPE. Consider this message from Debezium:

{code}
{
  "before": null,
  "after": {
    "dbserver1.mydb.team.Value": {
      "id": 1,
      "name": "kafka",
      "email": "ka...@apache.org",
      "last_modified": 1519939449000
    }
  },
  "source": {
    "version": {
      "string": "0.7.3"
    },
    "name": "dbserver1",
    "server_id": 0,
    "ts_sec": 0,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 154,
    "row": 0,
    "snapshot": {
      "boolean": true
    },
    "thread": null,
    "db": {
      "string": "mydb"
    },
    "table": {
      "string": "team"
    }
  },
  "op": "c",
  "ts_ms": {
    "long": 1519939520285
  }
}
{code}

Note how `before` is null; this event represents a row was INSERTED and thus 
there is no `before` state of the row. This results in an NPE:

{noformat}
org.apache.avro.SchemaParseException: Illegal character in: source.version
        at org.apache.avro.Schema.validateName(Schema.java:1151)
        at org.apache.avro.Schema.access$200(Schema.java:81)
        at org.apache.avro.Schema$Field.<init>(Schema.java:403)
        at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
        at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
        at 
org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
        at 
org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
        at 
io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
        at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
        at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
        at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
        at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
        at 
io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

Here's the connector configuration that was used:

{code}
{
    "name": "debezium-connector-flatten",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "223345",
        "database.server.name": "dbserver-flatten",
        "database.whitelist": "mydb",
        "database.history.kafka.bootstrap.servers": 
"kafka-1:9092,kafka-2:9092,kafka-3:9092",
        "database.history.kafka.topic": "schema-flatten.mydb",
        "include.schema.changes": "true",
        "transforms": "flatten",
        "transforms.flatten.type": 
"org.apache.kafka.connect.transforms.Flatten$Value",
        "transforms.flatten.delimiter": "_"
      }
}
{code}


> Flatten SMT does not properly handle fields that are null
> ---------------------------------------------------------
>
>                 Key: KAFKA-6605
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6605
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>            Reporter: Randall Hauch
>            Priority: Major
>
> When a message has a null field, the `Flatten` SMT does not properly handle 
> this and throws an NPE. Consider this message from Debezium:
> {code}
> {
>   "before": null,
>   "after": {
>     "dbserver1.mydb.team.Value": {
>       "id": 1,
>       "name": "kafka",
>       "email": "ka...@apache.org",
>       "last_modified": 1519939449000
>     }
>   },
>   "source": {
>     "version": {
>       "string": "0.7.3"
>     },
>     "name": "dbserver1",
>     "server_id": 0,
>     "ts_sec": 0,
>     "gtid": null,
>     "file": "mysql-bin.000003",
>     "pos": 154,
>     "row": 0,
>     "snapshot": {
>       "boolean": true
>     },
>     "thread": null,
>     "db": {
>       "string": "mydb"
>     },
>     "table": {
>       "string": "team"
>     }
>   },
>   "op": "c",
>   "ts_ms": {
>     "long": 1519939520285
>   }
> }
> {code}
> Note how `before` is null; this event represents a row was INSERTED and thus 
> there is no `before` state of the row. This results in an NPE:
> {noformat}
> org.apache.avro.SchemaParseException: Illegal character in: source.version
>       at org.apache.avro.Schema.validateName(Schema.java:1151)
>       at org.apache.avro.Schema.access$200(Schema.java:81)
>       at org.apache.avro.Schema$Field.<init>(Schema.java:403)
>       at 
> org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
>       at 
> org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
>       at 
> org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
>       at 
> org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
>       at 
> io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
>       at 
> io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
>       at 
> io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
>       at 
> io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
>       at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
>       at 
> io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
>       at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
>       at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> Here's the connector configuration that was used:
> {code}
> {
>     "name": "debezium-connector-flatten",
>     "config": {
>         "connector.class": "io.debezium.connector.mysql.MySqlConnector",
>         "tasks.max": "1",
>         "database.hostname": "mysql",
>         "database.port": "3306",
>         "database.user": "debezium",
>         "database.password": "dbz",
>         "database.server.id": "223345",
>         "database.server.name": "dbserver-flatten",
>         "database.whitelist": "mydb",
>         "database.history.kafka.bootstrap.servers": 
> "kafka-1:9092,kafka-2:9092,kafka-3:9092",
>         "database.history.kafka.topic": "schema-flatten.mydb",
>         "include.schema.changes": "true",
>         "transforms": "flatten",
>         "transforms.flatten.type": 
> "org.apache.kafka.connect.transforms.Flatten$Value",
>         "transforms.flatten.delimiter": "_"
>       }
> }
> {code}
> Note that the above configuration sets the delimiter to `_`. The default 
> delimiter is `.`, which is not a valid character within an Avro field, and 
> doing this results in the following exception:
> {noformat}
> org.apache.avro.SchemaParseException: Illegal character in: source.version
>       at org.apache.avro.Schema.validateName(Schema.java:1151)
>       at org.apache.avro.Schema.access$200(Schema.java:81)
>       at org.apache.avro.Schema$Field.<init>(Schema.java:403)
>       at 
> org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
>       at 
> org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
>       at 
> org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
>       at 
> org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
>       at 
> io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
>       at 
> io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
>       at 
> io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
>       at 
> io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
>       at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
>       at 
> io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
>       at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
>       at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This should probably be addressed in the documentation: when using Avro, set 
> the delimiter to `_` or another alphanumeric character.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to