I checked with the following json: { "schema":{ "type":"struct", "fields":[ { "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" }, { "type":"array", "items":{ "type":"string", "optional":true }, "optional":false, "field":"roles" } ], "optional":true, "name":"db.public.data.Value", "field":"before" }, { "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" }, { "type":"array", "items":{ "type":"string", "optional":true }, "optional":false, "field":"roles" } ], "optional":true, "name":"db.public.data.Value", "field":"after" }, { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"version" }, { "type":"string", "optional":false, "field":"connector" }, { "type":"string", "optional":false, "field":"name" }, { "type":"int64", "optional":false, "field":"ts_ms" }, { "type":"string", "optional":true, "name":"io.debezium.data.Enum", "version":1, "parameters":{ "allowed":"true,last,false" }, "default":"false", "field":"snapshot" }, { "type":"string", "optional":false, "field":"db" }, { "type":"string", "optional":false, "field":"schema" }, { "type":"string", "optional":false, "field":"table" }, { "type":"int64", "optional":true, "field":"txId" }, { "type":"int64", "optional":true, "field":"lsn" }, { "type":"int64", "optional":true, "field":"xmin" } ], "optional":false, "name":"io.debezium.connector.postgresql.Source", "field":"source" }, { "type":"string", "optional":false, "field":"op" }, { "type":"int64", "optional":true, "field":"ts_ms" }, { "type":"struct", "fields":[ { "type":"string", "optional":false, "field":"id" }, { "type":"int64", "optional":false, "field":"total_order" }, { "type":"int64", "optional":false, "field":"data_collection_order" } ], "optional":true, "field":"transaction" } ], "optional":false, "name":"db.public.data.Envelope" }, "payload":{ "before":null, "after":{ "id":76704, "roles":[ null ] }, "source":{ "version":"1.3.0.Final", "connector":"postgresql", "name":"db", "ts_ms":1605739197360, "snapshot":"true", "db":"db", "schema":"public", "table":"data", "txId":1784, "lsn":1305806608, "xmin":null }, "op":"r", "ts_ms":1605739197373, "transaction":null } }
Which works correctly. I reformatted it because it is with invalid JSON format. Rex Fenley <r...@remind101.com> 于2020年11月20日周五 上午3:02写道: > Below is a highly redacted set of data that should represent the problem. > As you can see, the "roles" field has "[null]" in it, a null value within > the array. We also see in our DB corresponding rows like the following. > id | roles > -----------+------------ > 16867433 | {NULL} > > We have confirmed that by not selecting "roles" all data passes through > without failure on a single operator, but selecting "roles" will eventually > always fail with java.lang.NullPointerException repeatedly. What is odd > about this is there is 0 additional stack trace, just the exception, in our > logs and in Flink UI. We only have INFO logging on, however, other > exceptions we've encountered in our development have always revealed a > stack trace. > > { > "schema": { > "type": "struct", > "fields": [ > { > "type": "struct", > "fields": [ > { "type": "int32", "optional": false, "field": "id" }, > { > "type": "array", > "items": { "type": "string", "optional": true }, > "optional": false, > "field": "roles" > }, > ], > "optional": true, > "name": "db.public.data.Value", > "field": "before" > }, > { > "type": "struct", > "fields": [ > { "type": "int32", "optional": false, "field": "id" }, > { > "type": "array", > "items": { "type": "string", "optional": true }, > "optional": false, > "field": "roles" > }, > ], > "optional": true, > "name": "db.public.data.Value", > "field": "after" > }, > { > "type": "struct", > "fields": [ > { "type": "string", "optional": false, "field": "version" }, > { "type": "string", "optional": false, "field": "connector" }, > { "type": "string", "optional": false, "field": "name" }, > { "type": "int64", "optional": false, "field": "ts_ms" }, > { > "type": "string", > "optional": true, > "name": "io.debezium.data.Enum", > "version": 1, > "parameters": { "allowed": "true,last,false" }, > "default": "false", > "field": "snapshot" > }, > { "type": "string", "optional": false, "field": "db" }, > { "type": "string", "optional": false, "field": "schema" }, > { "type": "string", "optional": false, "field": "table" }, > { "type": "int64", "optional": true, "field": "txId" }, > { "type": "int64", "optional": true, "field": "lsn" }, > { "type": "int64", "optional": true, "field": "xmin" } > ], > "optional": false, > "name": "io.debezium.connector.postgresql.Source", > "field": "source" > }, > { "type": "string", "optional": false, "field": "op" }, > { "type": "int64", "optional": true, "field": "ts_ms" }, > { > "type": "struct", > "fields": [ > { "type": "string", "optional": false, "field": "id" }, > { "type": "int64", "optional": false, "field": "total_order" }, > { > "type": "int64", > "optional": false, > "field": "data_collection_order" > } > ], > "optional": true, > "field": "transaction" > } > ], > "optional": false, > "name": "db.public.data.Envelope" > }, > "payload": { > "before": null, > "after": { > "id": 76704, > "roles": [null], > }, > "source": { > "version": "1.3.0.Final", > "connector": "postgresql", > "name": "db", > "ts_ms": 1605739197360, > "snapshot": "true", > "db": "db", > "schema": "public", > "table": "data", > "txId": 1784, > "lsn": 1305806608, > "xmin": null > }, > "op": "r", > "ts_ms": 1605739197373, > "transaction": null > } > } > > cc Brad > > On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <dy...@oseberg.io> wrote: > >> Ah yes, missed the kafka part and just saw the array part. FLINK-19771 >> definitely was solely in the postgres-specific code. >> >> >> >> Dylan >> >> >> >> *From: *Jark Wu <imj...@gmail.com> >> *Date: *Thursday, November 19, 2020 at 9:12 AM >> *To: *Dylan Forciea <dy...@oseberg.io> >> *Cc: *Danny Chan <danny0...@apache.org>, Rex Fenley <r...@remind101.com>, >> Flink ML <user@flink.apache.org> >> *Subject: *Re: Filter Null in Array in SQL Connector >> >> >> >> Hi Dylan, >> >> >> >> I think Rex encountered another issue, because he is using Kafka with >> Debezium format. >> >> >> >> Hi Rex, >> >> >> >> If you can share the json data and the exception stack, that would be >> helpful! >> >> >> >> Besides, you can try to enable 'debezium-json.ignore-parse-errors' option >> [1] to skip the dirty data. >> >> >> >> Best, >> >> Jark >> >> >> >> [1]: >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors >> >> >> >> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <dy...@oseberg.io> wrote: >> >> Do you mean that the array contains values that are null, or that the >> entire array itself is null? If it’s the latter, I have an issue written, >> along with a PR to fix it that has been pending review [1]. >> >> >> >> Regards, >> >> Dylan Forciea >> >> >> >> [1] https://issues.apache.org/jira/browse/FLINK-19771 >> >> >> >> *From: *Danny Chan <danny0...@apache.org> >> *Date: *Thursday, November 19, 2020 at 2:24 AM >> *To: *Rex Fenley <r...@remind101.com> >> *Cc: *Flink ML <user@flink.apache.org> >> *Subject: *Re: Filter Null in Array in SQL Connector >> >> >> >> Hi, Fenley ~ >> >> >> >> You are right, parsing nulls of ARRAY field is not supported now, i have >> logged an issue [1] and would fix it soon ~ >> >> >> >> [1] https://issues.apache.org/jira/browse/FLINK-20234 >> >> >> >> Rex Fenley <r...@remind101.com> 于2020年11月19日周四 下午2:51写道: >> >> Hi, >> >> >> >> I recently discovered some of our data has NULL values arriving in an >> ARRAY<STRING> column. This column is being consumed by Flink via the Kafka >> connector Debezium format. We seem to be receiving NullPointerExceptions >> for when these NULL values in the arrays arrive which restarts the source >> operator in a loop. >> >> >> >> Is there any way to not throw or to possibly filter out NULLs in an Array >> of Strings in Flink? >> >> >> >> We're somewhat stuck on how to solve this problem, we'd like to be >> defensive about this on Flink's side. >> >> >> >> Thanks! >> >> >> >> (P.S. The exception was not that informative, there may be room for >> improvement in terms of a richer error message when this happens.) >> >> >> -- >> >> *Rex Fenley* | Software Engineer - Mobile and Backend >> >> >> >> *Remind.com* <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | *LIKE US >> <https://www.facebook.com/remindhq>* >> >> > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >