[ 
https://issues.apache.org/jira/browse/FLINK-20234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Chen updated FLINK-20234:
-------------------------------
    Fix Version/s:     (was: 1.12.0)

> Json format supports SE/DE null elements of ARRAY type field
> ------------------------------------------------------------
>
>                 Key: FLINK-20234
>                 URL: https://issues.apache.org/jira/browse/FLINK-20234
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>    Affects Versions: 1.11.2
>            Reporter: Danny Chen
>            Priority: Major
>
> Report my USER mailing list:
> Hi,
> I recently discovered some of our data has NULL values arriving in an 
> ARRAY<STRING> column. This column is being consumed by Flink via the Kafka 
> connector Debezium format. We seem to be receiving NullPointerExceptions for 
> when these NULL values in the arrays arrive which restarts the source 
> operator in a loop.
> Is there any way to not throw or to possibly filter out NULLs in an Array of 
> Strings in Flink?
> We're somewhat stuck on how to solve this problem, we'd like to be defensive 
> about this on Flink's side.
> Thanks!
> The JSON:
> {code:java}
> {
>   "schema": {
>     "type": "struct",
>     "fields": [
>       {
>         "type": "struct",
>         "fields": [
>           { "type": "int32", "optional": false, "field": "id" },
>           {
>             "type": "array",
>             "items": { "type": "string", "optional": true },
>             "optional": false,
>             "field": "roles"
>           },
>         ],
>         "optional": true,
>         "name": "db.public.data.Value",
>         "field": "before"
>       },
>       {
>         "type": "struct",
>         "fields": [
>           { "type": "int32", "optional": false, "field": "id" },
>           {
>             "type": "array",
>             "items": { "type": "string", "optional": true },
>             "optional": false,
>             "field": "roles"
>           },
>         ],
>         "optional": true,
>         "name": "db.public.data.Value",
>         "field": "after"
>       },
>       {
>         "type": "struct",
>         "fields": [
>           { "type": "string", "optional": false, "field": "version" },
>           { "type": "string", "optional": false, "field": "connector" },
>           { "type": "string", "optional": false, "field": "name" },
>           { "type": "int64", "optional": false, "field": "ts_ms" },
>           {
>             "type": "string",
>             "optional": true,
>             "name": "io.debezium.data.Enum",
>             "version": 1,
>             "parameters": { "allowed": "true,last,false" },
>             "default": "false",
>             "field": "snapshot"
>           },
>           { "type": "string", "optional": false, "field": "db" },
>           { "type": "string", "optional": false, "field": "schema" },
>           { "type": "string", "optional": false, "field": "table" },
>           { "type": "int64", "optional": true, "field": "txId" },
>           { "type": "int64", "optional": true, "field": "lsn" },
>           { "type": "int64", "optional": true, "field": "xmin" }
>         ],
>         "optional": false,
>         "name": "io.debezium.connector.postgresql.Source",
>         "field": "source"
>       },
>       { "type": "string", "optional": false, "field": "op" },
>       { "type": "int64", "optional": true, "field": "ts_ms" },
>       {
>         "type": "struct",
>         "fields": [
>           { "type": "string", "optional": false, "field": "id" },
>           { "type": "int64", "optional": false, "field": "total_order" },
>           {
>             "type": "int64",
>             "optional": false,
>             "field": "data_collection_order"
>           }
>         ],
>         "optional": true,
>         "field": "transaction"
>       }
>     ],
>     "optional": false,
>     "name": "db.public.data.Envelope"
>   },
>   "payload": {
>     "before": null,
>     "after": {
>       "id": 76704,
>       "roles": [null],
>     },
>     "source": {
>       "version": "1.3.0.Final",
>       "connector": "postgresql",
>       "name": "db",
>       "ts_ms": 1605739197360,
>       "snapshot": "true",
>       "db": "db",
>       "schema": "public",
>       "table": "data",
>       "txId": 1784,
>       "lsn": 1305806608,
>       "xmin": null
>     },
>     "op": "r",
>     "ts_ms": 1605739197373,
>     "transaction": null
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to