Thanks Rex! This is very helpful. Will check it out later.

On Fri, 20 Nov 2020 at 03:02, Rex Fenley <r...@remind101.com> wrote:

> Below is a highly redacted set of data that should represent the problem.
> As you can see, the "roles" field has "[null]" in it, a null value within
> the array. We also see in our DB corresponding rows like the following.
>     id     | roles
> -----------+------------
>   16867433 | {NULL}
>
> We have confirmed that by not selecting "roles" all data passes through
> without failure on a single operator, but selecting "roles" will eventually
> always fail with java.lang.NullPointerException repeatedly. What is odd
> about this is there is 0 additional stack trace, just the exception, in our
> logs and in Flink UI. We only have INFO logging on, however, other
> exceptions we've encountered in our development have always revealed a
> stack trace.
>
> {
>   "schema": {
>     "type": "struct",
>     "fields": [
>       {
>         "type": "struct",
>         "fields": [
>           { "type": "int32", "optional": false, "field": "id" },
>           {
>             "type": "array",
>             "items": { "type": "string", "optional": true },
>             "optional": false,
>             "field": "roles"
>           },
>         ],
>         "optional": true,
>         "name": "db.public.data.Value",
>         "field": "before"
>       },
>       {
>         "type": "struct",
>         "fields": [
>           { "type": "int32", "optional": false, "field": "id" },
>           {
>             "type": "array",
>             "items": { "type": "string", "optional": true },
>             "optional": false,
>             "field": "roles"
>           },
>         ],
>         "optional": true,
>         "name": "db.public.data.Value",
>         "field": "after"
>       },
>       {
>         "type": "struct",
>         "fields": [
>           { "type": "string", "optional": false, "field": "version" },
>           { "type": "string", "optional": false, "field": "connector" },
>           { "type": "string", "optional": false, "field": "name" },
>           { "type": "int64", "optional": false, "field": "ts_ms" },
>           {
>             "type": "string",
>             "optional": true,
>             "name": "io.debezium.data.Enum",
>             "version": 1,
>             "parameters": { "allowed": "true,last,false" },
>             "default": "false",
>             "field": "snapshot"
>           },
>           { "type": "string", "optional": false, "field": "db" },
>           { "type": "string", "optional": false, "field": "schema" },
>           { "type": "string", "optional": false, "field": "table" },
>           { "type": "int64", "optional": true, "field": "txId" },
>           { "type": "int64", "optional": true, "field": "lsn" },
>           { "type": "int64", "optional": true, "field": "xmin" }
>         ],
>         "optional": false,
>         "name": "io.debezium.connector.postgresql.Source",
>         "field": "source"
>       },
>       { "type": "string", "optional": false, "field": "op" },
>       { "type": "int64", "optional": true, "field": "ts_ms" },
>       {
>         "type": "struct",
>         "fields": [
>           { "type": "string", "optional": false, "field": "id" },
>           { "type": "int64", "optional": false, "field": "total_order" },
>           {
>             "type": "int64",
>             "optional": false,
>             "field": "data_collection_order"
>           }
>         ],
>         "optional": true,
>         "field": "transaction"
>       }
>     ],
>     "optional": false,
>     "name": "db.public.data.Envelope"
>   },
>   "payload": {
>     "before": null,
>     "after": {
>       "id": 76704,
>       "roles": [null],
>     },
>     "source": {
>       "version": "1.3.0.Final",
>       "connector": "postgresql",
>       "name": "db",
>       "ts_ms": 1605739197360,
>       "snapshot": "true",
>       "db": "db",
>       "schema": "public",
>       "table": "data",
>       "txId": 1784,
>       "lsn": 1305806608,
>       "xmin": null
>     },
>     "op": "r",
>     "ts_ms": 1605739197373,
>     "transaction": null
>   }
> }
>
> cc Brad
>
> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <dy...@oseberg.io> wrote:
>
>> Ah yes, missed the kafka part and just saw the array part. FLINK-19771
>> definitely was solely in the postgres-specific code.
>>
>>
>>
>> Dylan
>>
>>
>>
>> *From: *Jark Wu <imj...@gmail.com>
>> *Date: *Thursday, November 19, 2020 at 9:12 AM
>> *To: *Dylan Forciea <dy...@oseberg.io>
>> *Cc: *Danny Chan <danny0...@apache.org>, Rex Fenley <r...@remind101.com>,
>> Flink ML <user@flink.apache.org>
>> *Subject: *Re: Filter Null in Array in SQL Connector
>>
>>
>>
>> Hi Dylan,
>>
>>
>>
>> I think Rex encountered another issue, because he is using Kafka with
>> Debezium format.
>>
>>
>>
>> Hi Rex,
>>
>>
>>
>> If you can share the json data and the exception stack, that would be
>> helpful!
>>
>>
>>
>> Besides, you can try to enable 'debezium-json.ignore-parse-errors' option
>> [1] to skip the dirty data.
>>
>>
>>
>> Best,
>>
>> Jark
>>
>>
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>
>>
>>
>> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <dy...@oseberg.io> wrote:
>>
>> Do you mean that the array contains values that are null, or that the
>> entire array itself is null? If it’s the latter, I have an issue written,
>> along with a PR to fix it that has been pending review [1].
>>
>>
>>
>> Regards,
>>
>> Dylan Forciea
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-19771
>>
>>
>>
>> *From: *Danny Chan <danny0...@apache.org>
>> *Date: *Thursday, November 19, 2020 at 2:24 AM
>> *To: *Rex Fenley <r...@remind101.com>
>> *Cc: *Flink ML <user@flink.apache.org>
>> *Subject: *Re: Filter Null in Array in SQL Connector
>>
>>
>>
>> Hi, Fenley ~
>>
>>
>>
>> You are right, parsing nulls of ARRAY field is not supported now, i have
>> logged an issue [1] and would fix it soon ~
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20234
>>
>>
>>
>> Rex Fenley <r...@remind101.com> 于2020年11月19日周四 下午2:51写道:
>>
>> Hi,
>>
>>
>>
>> I recently discovered some of our data has NULL values arriving in an
>> ARRAY<STRING> column. This column is being consumed by Flink via the Kafka
>> connector Debezium format. We seem to be receiving NullPointerExceptions
>> for when these NULL values in the arrays arrive which restarts the source
>> operator in a loop.
>>
>>
>>
>> Is there any way to not throw or to possibly filter out NULLs in an Array
>> of Strings in Flink?
>>
>>
>>
>> We're somewhat stuck on how to solve this problem, we'd like to be
>> defensive about this on Flink's side.
>>
>>
>>
>> Thanks!
>>
>>
>>
>> (P.S. The exception was not that informative, there may be room for
>> improvement in terms of a richer error message when this happens.)
>>
>>
>> --
>>
>> *Rex Fenley*  |  Software Engineer - Mobile and Backend
>>
>>
>>
>> *Remind.com* <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  *LIKE US
>> <https://www.facebook.com/remindhq>*
>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Reply via email to