[ https://issues.apache.org/jira/browse/FLINK-20234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Danny Chen updated FLINK-20234: ------------------------------- Fix Version/s: (was: 1.12.0) > Json format supports SE/DE null elements of ARRAY type field > ------------------------------------------------------------ > > Key: FLINK-20234 > URL: https://issues.apache.org/jira/browse/FLINK-20234 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem > Affects Versions: 1.11.2 > Reporter: Danny Chen > Priority: Major > > Report my USER mailing list: > Hi, > I recently discovered some of our data has NULL values arriving in an > ARRAY<STRING> column. This column is being consumed by Flink via the Kafka > connector Debezium format. We seem to be receiving NullPointerExceptions for > when these NULL values in the arrays arrive which restarts the source > operator in a loop. > Is there any way to not throw or to possibly filter out NULLs in an Array of > Strings in Flink? > We're somewhat stuck on how to solve this problem, we'd like to be defensive > about this on Flink's side. > Thanks! > The JSON: > {code:java} > { > "schema": { > "type": "struct", > "fields": [ > { > "type": "struct", > "fields": [ > { "type": "int32", "optional": false, "field": "id" }, > { > "type": "array", > "items": { "type": "string", "optional": true }, > "optional": false, > "field": "roles" > }, > ], > "optional": true, > "name": "db.public.data.Value", > "field": "before" > }, > { > "type": "struct", > "fields": [ > { "type": "int32", "optional": false, "field": "id" }, > { > "type": "array", > "items": { "type": "string", "optional": true }, > "optional": false, > "field": "roles" > }, > ], > "optional": true, > "name": "db.public.data.Value", > "field": "after" > }, > { > "type": "struct", > "fields": [ > { "type": "string", "optional": false, "field": "version" }, > { "type": "string", "optional": false, "field": "connector" }, > { "type": "string", "optional": false, "field": "name" }, > { "type": "int64", "optional": false, "field": "ts_ms" }, > { > "type": "string", > "optional": true, > "name": "io.debezium.data.Enum", > "version": 1, > "parameters": { "allowed": "true,last,false" }, > "default": "false", > "field": "snapshot" > }, > { "type": "string", "optional": false, "field": "db" }, > { "type": "string", "optional": false, "field": "schema" }, > { "type": "string", "optional": false, "field": "table" }, > { "type": "int64", "optional": true, "field": "txId" }, > { "type": "int64", "optional": true, "field": "lsn" }, > { "type": "int64", "optional": true, "field": "xmin" } > ], > "optional": false, > "name": "io.debezium.connector.postgresql.Source", > "field": "source" > }, > { "type": "string", "optional": false, "field": "op" }, > { "type": "int64", "optional": true, "field": "ts_ms" }, > { > "type": "struct", > "fields": [ > { "type": "string", "optional": false, "field": "id" }, > { "type": "int64", "optional": false, "field": "total_order" }, > { > "type": "int64", > "optional": false, > "field": "data_collection_order" > } > ], > "optional": true, > "field": "transaction" > } > ], > "optional": false, > "name": "db.public.data.Envelope" > }, > "payload": { > "before": null, > "after": { > "id": 76704, > "roles": [null], > }, > "source": { > "version": "1.3.0.Final", > "connector": "postgresql", > "name": "db", > "ts_ms": 1605739197360, > "snapshot": "true", > "db": "db", > "schema": "public", > "table": "data", > "txId": 1784, > "lsn": 1305806608, > "xmin": null > }, > "op": "r", > "ts_ms": 1605739197373, > "transaction": null > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)