Re: Filter Null in Array in SQL Connector

2020-12-01 Thread Rex Fenley
": false,
>>>>>> "field": "roles"
>>>>>>   },
>>>>>> ],
>>>>>> "optional": true,
>>>>>> "name": "db.public.data.Value",
>>>>>> "field": "before"
>>>>>>   },
>>>>>>   {
>>>>>> "type": "struct",
>>>>>> "fields": [
>>>>>>   { "type": "int32", "optional": false, "field": "id" },
>>>>>>   {
>>>>>> "type": "array",
>>>>>> "items": { "type": "string", "optional": true },
>>>>>> "optional": false,
>>>>>> "field": "roles"
>>>>>>   },
>>>>>> ],
>>>>>> "optional": true,
>>>>>> "name": "db.public.data.Value",
>>>>>> "field": "after"
>>>>>>   },
>>>>>>   {
>>>>>> "type": "struct",
>>>>>> "fields": [
>>>>>>   { "type": "string", "optional": false, "field": "version" },
>>>>>>   { "type": "string", "optional": false, "field": "connector"
>>>>>> },
>>>>>>   { "type": "string", "optional": false, "field": "name" },
>>>>>>   { "type": "int64", "optional": false, "field": "ts_ms" },
>>>>>>   {
>>>>>> "type": "string",
>>>>>> "optional": true,
>>>>>> "name": "io.debezium.data.Enum",
>>>>>> "version": 1,
>>>>>> "parameters": { "allowed": "true,last,false" },
>>>>>> "default": "false",
>>>>>> "field": "snapshot"
>>>>>>   },
>>>>>>   { "type": "string", "optional": false, "field": "db" },
>>>>>>   { "type": "string", "optional": false, "field": "schema" },
>>>>>>   { "type": "string", "optional": false, "field": "table" },
>>>>>>   { "type": "int64", "optional": true, "field": "txId" },
>>>>>>   { "type": "int64", "optional": true, "field": "lsn" },
>>>>>>   { "type": "int64", "optional": true, "field": "xmin" }
>>>>>> ],
>>>>>> "optional": false,
>>>>>> "name": "io.debezium.connector.postgresql.Source",
>>>>>> "field": "source"
>>>>>>   },
>>>>>>   { "type": "string", "optional": false, "field": "op" },
>>>>>>   { "type": "int64", "optional": true, "field": "ts_ms" },
>>>>>>   {
>>>>>> "type": "struct",
>>>>>> "fields": [
>>>>>>   { "type": "string", "optional": false, "field": "id" },
>>>>>>   { "type": "int64", "optional": false, "field":
>>>>>> "total_order" },
>>>>>>   {
>>>>>> "type": "int64",
>>>>>> "optional": false,
>>>>>> "field": "data_collection_order"
>>>>>>

Re: Filter Null in Array in SQL Connector

2020-12-01 Thread Danny Chan
ring", "optional": true },
>>>>> "optional": false,
>>>>> "field": "roles"
>>>>>   },
>>>>> ],
>>>>> "optional": true,
>>>>> "name": "db.public.data.Value",
>>>>> "field": "after"
>>>>>   },
>>>>>   {
>>>>> "type": "struct",
>>>>> "fields": [
>>>>>   { "type": "string", "optional": false, "field": "version" },
>>>>>   { "type": "string", "optional": false, "field": "connector"
>>>>> },
>>>>>   { "type": "string", "optional": false, "field": "name" },
>>>>>   { "type": "int64", "optional": false, "field": "ts_ms" },
>>>>>   {
>>>>> "type": "string",
>>>>> "optional": true,
>>>>> "name": "io.debezium.data.Enum",
>>>>> "version": 1,
>>>>> "parameters": { "allowed": "true,last,false" },
>>>>> "default": "false",
>>>>> "field": "snapshot"
>>>>>   },
>>>>>   { "type": "string", "optional": false, "field": "db" },
>>>>>       { "type": "string", "optional": false, "field": "schema" },
>>>>>   { "type": "string", "optional": false, "field": "table" },
>>>>>   { "type": "int64", "optional": true, "field": "txId" },
>>>>>   { "type": "int64", "optional": true, "field": "lsn" },
>>>>>   { "type": "int64", "optional": true, "field": "xmin" }
>>>>> ],
>>>>> "optional": false,
>>>>> "name": "io.debezium.connector.postgresql.Source",
>>>>> "field": "source"
>>>>>   },
>>>>>   { "type": "string", "optional": false, "field": "op" },
>>>>>   { "type": "int64", "optional": true, "field": "ts_ms" },
>>>>>   {
>>>>> "type": "struct",
>>>>> "fields": [
>>>>>   { "type": "string", "optional": false, "field": "id" },
>>>>>   { "type": "int64", "optional": false, "field": "total_order"
>>>>> },
>>>>>   {
>>>>> "type": "int64",
>>>>> "optional": false,
>>>>> "field": "data_collection_order"
>>>>>   }
>>>>> ],
>>>>> "optional": true,
>>>>> "field": "transaction"
>>>>>   }
>>>>> ],
>>>>> "optional": false,
>>>>> "name": "db.public.data.Envelope"
>>>>>   },
>>>>>   "payload": {
>>>>> "before": null,
>>>>> "after": {
>>>>>   "id": 76704,
>>>>>   "roles": [null],
>>>>> },
>>>>> "source": {
>>>>>   "version": "1.3.0.Final",
>>>>>   "connector": "postgresql",
>>>>>   "name": "db",
>>>>>   "ts_ms": 1605739197360,
>>>>>   "snapshot": "true",
>>>>>   "db": "d

Re: Filter Null in Array in SQL Connector

2020-11-30 Thread Rex Fenley
: false, "field": "version" },
>>>>   { "type": "string", "optional": false, "field": "connector" },
>>>>   { "type": "string", "optional": false, "field": "name" },
>>>>   { "type": "int64", "optional": false, "field": "ts_ms" },
>>>>   {
>>>> "type": "string",
>>>> "optional": true,
>>>> "name": "io.debezium.data.Enum",
>>>> "version": 1,
>>>> "parameters": { "allowed": "true,last,false" },
>>>> "default": "false",
>>>> "field": "snapshot"
>>>>   },
>>>>   { "type": "string", "optional": false, "field": "db" },
>>>>   { "type": "string", "optional": false, "field": "schema" },
>>>>   { "type": "string", "optional": false, "field": "table" },
>>>>   { "type": "int64", "optional": true, "field": "txId" },
>>>>   { "type": "int64", "optional": true, "field": "lsn" },
>>>>   { "type": "int64", "optional": true, "field": "xmin" }
>>>> ],
>>>> "optional": false,
>>>> "name": "io.debezium.connector.postgresql.Source",
>>>> "field": "source"
>>>>   },
>>>>   { "type": "string", "optional": false, "field": "op" },
>>>>   { "type": "int64", "optional": true, "field": "ts_ms" },
>>>>   {
>>>> "type": "struct",
>>>> "fields": [
>>>>   { "type": "string", "optional": false, "field": "id" },
>>>>   { "type": "int64", "optional": false, "field": "total_order"
>>>> },
>>>>   {
>>>> "type": "int64",
>>>> "optional": false,
>>>> "field": "data_collection_order"
>>>>   }
>>>> ],
>>>> "optional": true,
>>>> "field": "transaction"
>>>>   }
>>>> ],
>>>> "optional": false,
>>>> "name": "db.public.data.Envelope"
>>>>   },
>>>>   "payload": {
>>>> "before": null,
>>>> "after": {
>>>>   "id": 76704,
>>>>   "roles": [null],
>>>> },
>>>> "source": {
>>>>   "version": "1.3.0.Final",
>>>>   "connector": "postgresql",
>>>>   "name": "db",
>>>>   "ts_ms": 1605739197360,
>>>>   "snapshot": "true",
>>>>   "db": "db",
>>>>   "schema": "public",
>>>>   "table": "data",
>>>>   "txId": 1784,
>>>>   "lsn": 1305806608,
>>>>   "xmin": null
>>>> },
>>>> "op": "r",
>>>> "ts_ms": 1605739197373,
>>>> "transaction": null
>>>>   }
>>>> }
>>>>
>>>> cc Brad
>>>>
>>>> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea  wrote:
>>>>
>>>>> Ah yes, missed the kafka part and just saw the array part. FLINK-19771
>>>>> definitely was solely in the postgres-specific code.
>>>>>
>>>>>
>>>>>
>>>>> Dylan
>>>>>
>>>>>
>>>>>
&g

Re: Filter Null in Array in SQL Connector

2020-11-20 Thread Rex Fenley
uot;optional": true,
>>> "name": "io.debezium.data.Enum",
>>> "version": 1,
>>> "parameters": { "allowed": "true,last,false" },
>>> "default": "false",
>>> "field": "snapshot"
>>>   },
>>>   { "type": "string", "optional": false, "field": "db" },
>>>   { "type": "string", "optional": false, "field": "schema" },
>>>   { "type": "string", "optional": false, "field": "table" },
>>>   { "type": "int64", "optional": true, "field": "txId" },
>>>   { "type": "int64", "optional": true, "field": "lsn" },
>>>   { "type": "int64", "optional": true, "field": "xmin" }
>>> ],
>>> "optional": false,
>>> "name": "io.debezium.connector.postgresql.Source",
>>> "field": "source"
>>>   },
>>>   { "type": "string", "optional": false, "field": "op" },
>>>   { "type": "int64", "optional": true, "field": "ts_ms" },
>>>   {
>>> "type": "struct",
>>> "fields": [
>>>   { "type": "string", "optional": false, "field": "id" },
>>>   { "type": "int64", "optional": false, "field": "total_order" },
>>>   {
>>> "type": "int64",
>>> "optional": false,
>>> "field": "data_collection_order"
>>>   }
>>> ],
>>>     "optional": true,
>>> "field": "transaction"
>>>   }
>>> ],
>>> "optional": false,
>>> "name": "db.public.data.Envelope"
>>>   },
>>>   "payload": {
>>> "before": null,
>>> "after": {
>>>   "id": 76704,
>>>   "roles": [null],
>>> },
>>> "source": {
>>>   "version": "1.3.0.Final",
>>>   "connector": "postgresql",
>>>   "name": "db",
>>>   "ts_ms": 1605739197360,
>>>   "snapshot": "true",
>>>   "db": "db",
>>>   "schema": "public",
>>>   "table": "data",
>>>   "txId": 1784,
>>>   "lsn": 1305806608,
>>>   "xmin": null
>>> },
>>> "op": "r",
>>> "ts_ms": 1605739197373,
>>> "transaction": null
>>>   }
>>> }
>>>
>>> cc Brad
>>>
>>> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea  wrote:
>>>
>>>> Ah yes, missed the kafka part and just saw the array part. FLINK-19771
>>>> definitely was solely in the postgres-specific code.
>>>>
>>>>
>>>>
>>>> Dylan
>>>>
>>>>
>>>>
>>>> *From: *Jark Wu 
>>>> *Date: *Thursday, November 19, 2020 at 9:12 AM
>>>> *To: *Dylan Forciea 
>>>> *Cc: *Danny Chan , Rex Fenley ,
>>>> Flink ML 
>>>> *Subject: *Re: Filter Null in Array in SQL Connector
>>>>
>>>>
>>>>
>>>> Hi Dylan,
>>>>
>>>>
>>>>
>>>> I think Rex encountered another issue, because he is using Kafka with
>>>> Debezium format.
>>>>
>>>>
>>>>
>>>> Hi Rex,
>>>>
>>>>
>>>>
>>>> If you can share the json data and the exception stack, that would be
>>>> helpful!
>>>>
>>>>
>>>>
>>>> Besides, you can try to enable 'debezium-json.ignore-parse-errors'
>>>> option [1] to skip the dirty data.
>>>>
>>>>
>>>>
>>>> Best,
>>>>
>>>> Jark
>>>>
>>>>
>>>>
>>>> [1]:
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
>>>>
>>>>
>>>>
>>>> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea  wrote:
>>>>
>>>> Do you mean that the array contains values that are null, or that the
>>>> entire array itself is null? If it’s the latter, I have an issue written,
>>>> along with a PR to fix it that has been pending review [1].
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Dylan Forciea
>>>>
>>>>
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-19771
>>>>
>>>>
>>>>
>>>> *From: *Danny Chan 
>>>> *Date: *Thursday, November 19, 2020 at 2:24 AM
>>>> *To: *Rex Fenley 
>>>> *Cc: *Flink ML 
>>>> *Subject: *Re: Filter Null in Array in SQL Connector
>>>>
>>>>
>>>>
>>>> Hi, Fenley ~
>>>>
>>>>
>>>>
>>>> You are right, parsing nulls of ARRAY field is not supported now, i
>>>> have logged an issue [1] and would fix it soon ~
>>>>
>>>>
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-20234
>>>>
>>>>
>>>>
>>>> Rex Fenley  于2020年11月19日周四 下午2:51写道:
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I recently discovered some of our data has NULL values arriving in an
>>>> ARRAY column. This column is being consumed by Flink via the Kafka
>>>> connector Debezium format. We seem to be receiving NullPointerExceptions
>>>> for when these NULL values in the arrays arrive which restarts the source
>>>> operator in a loop.
>>>>
>>>>
>>>>
>>>> Is there any way to not throw or to possibly filter out NULLs in an
>>>> Array of Strings in Flink?
>>>>
>>>>
>>>>
>>>> We're somewhat stuck on how to solve this problem, we'd like to be
>>>> defensive about this on Flink's side.
>>>>
>>>>
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>> (P.S. The exception was not that informative, there may be room for
>>>> improvement in terms of a richer error message when this happens.)
>>>>
>>>>
>>>> --
>>>>
>>>> *Rex Fenley*  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>>
>>>> *Remind.com* <https://www.remind.com/> |  BLOG
>>>> <http://blog.remind.com/>  |  FOLLOW US <https://twitter.com/remindhq>
>>>>  |  *LIKE US <https://www.facebook.com/remindhq>*
>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Rex Fenley
Thanks!

Update: We've confirmed with a test copy of our data now that if we remove
all the null values from arrays everything works smoothly and as expected.
So this definitely appears to be the culprit.

On Thu, Nov 19, 2020 at 6:41 PM Jark Wu  wrote:

> Thanks Rex! This is very helpful. Will check it out later.
>
>
> On Fri, 20 Nov 2020 at 03:02, Rex Fenley  wrote:
>
>> Below is a highly redacted set of data that should represent the problem.
>> As you can see, the "roles" field has "[null]" in it, a null value within
>> the array. We also see in our DB corresponding rows like the following.
>> id | roles
>> ---+
>>   16867433 | {NULL}
>>
>> We have confirmed that by not selecting "roles" all data passes through
>> without failure on a single operator, but selecting "roles" will eventually
>> always fail with java.lang.NullPointerException repeatedly. What is odd
>> about this is there is 0 additional stack trace, just the exception, in our
>> logs and in Flink UI. We only have INFO logging on, however, other
>> exceptions we've encountered in our development have always revealed a
>> stack trace.
>>
>> {
>>   "schema": {
>> "type": "struct",
>> "fields": [
>>   {
>> "type": "struct",
>> "fields": [
>>   { "type": "int32", "optional": false, "field": "id" },
>>   {
>> "type": "array",
>> "items": { "type": "string", "optional": true },
>> "optional": false,
>> "field": "roles"
>>   },
>> ],
>> "optional": true,
>> "name": "db.public.data.Value",
>> "field": "before"
>>   },
>>   {
>> "type": "struct",
>> "fields": [
>>   { "type": "int32", "optional": false, "field": "id" },
>>   {
>> "type": "array",
>> "items": { "type": "string", "optional": true },
>> "optional": false,
>> "field": "roles"
>>   },
>> ],
>> "optional": true,
>> "name": "db.public.data.Value",
>> "field": "after"
>>   },
>>   {
>> "type": "struct",
>> "fields": [
>>   { "type": "string", "optional": false, "field": "version" },
>>   { "type": "string", "optional": false, "field": "connector" },
>>   { "type": "string", "optional": false, "field": "name" },
>>   { "type": "int64", "optional": false, "field": "ts_ms" },
>>   {
>> "type": "string",
>> "optional": true,
>> "name": "io.debezium.data.Enum",
>> "version": 1,
>> "parameters": { "allowed": "true,last,false" },
>> "default": "false",
>> "field": "snapshot"
>>   },
>>   { "type": "string", "optional": false, "field": "db" },
>>   { "type": "string", "optional": false, "field": "schema" },
>>   { "type": "string", "optional": false, "field": "table" },
>>   { "type": "int64", "optional": true, "field": "txId" },
>>   { "type": "int64", "optional": true, "field": "lsn" },
>>   { "type": "int64", "optional": true, "field": "xmin" }
>> ],
>> "optional": false,
>> "name": "io.debezium.connector.postgresql.Source",
>>  

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
quot;
   },
   {
  "type":"int64",
  "optional":false,
  "field":"total_order"
   },
   {
  "type":"int64",
  "optional":false,
  "field":"data_collection_order"
   }
],
"optional":true,
"field":"transaction"
 }
  ],
  "optional":false,
  "name":"db.public.data.Envelope"
   },
   "payload":{
  "before":null,
  "after":{
 "id":76704,
 "roles":[
null
 ]
  },
  "source":{
 "version":"1.3.0.Final",
 "connector":"postgresql",
 "name":"db",
 "ts_ms":1605739197360,
 "snapshot":"true",
 "db":"db",
 "schema":"public",
 "table":"data",
 "txId":1784,
 "lsn":1305806608,
 "xmin":null
  },
  "op":"r",
  "ts_ms":1605739197373,
  "transaction":null
   }
}

Which works correctly. I reformatted it because it is with invalid JSON
format.

Rex Fenley  于2020年11月20日周五 上午3:02写道:

> Below is a highly redacted set of data that should represent the problem.
> As you can see, the "roles" field has "[null]" in it, a null value within
> the array. We also see in our DB corresponding rows like the following.
> id | roles
> ---+
>   16867433 | {NULL}
>
> We have confirmed that by not selecting "roles" all data passes through
> without failure on a single operator, but selecting "roles" will eventually
> always fail with java.lang.NullPointerException repeatedly. What is odd
> about this is there is 0 additional stack trace, just the exception, in our
> logs and in Flink UI. We only have INFO logging on, however, other
> exceptions we've encountered in our development have always revealed a
> stack trace.
>
> {
>   "schema": {
> "type": "struct",
> "fields": [
>   {
> "type": "struct",
> "fields": [
>   { "type": "int32", "optional": false, "field": "id" },
>   {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
>   },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "before"
>   },
>   {
> "type": "struct",
> "fields": [
>   { "type": "int32", "optional": false, "field": "id" },
>   {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
>   },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "after"
>   },
>   {
> "type": "struct",
> "fields": [
>   { "type": "string", "optional": false, "field": "version" },
>   { "type": "string", "optional": false, "field": "connector" },
>   { "type": "string", "optional": false, "field": "name" },
>   { "type": "int64", "optional": false, "field": "ts_ms" },
>   {
> "type": "string",
> "optional": true,
> "name": "io.debezium.data.Enum",
> "version": 1,
> "parameters": { "allowed": "true,last,false" },
> "default": "false",
> "field": "snapshot"
>   },
>  

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Jark Wu
Thanks Rex! This is very helpful. Will check it out later.


On Fri, 20 Nov 2020 at 03:02, Rex Fenley  wrote:

> Below is a highly redacted set of data that should represent the problem.
> As you can see, the "roles" field has "[null]" in it, a null value within
> the array. We also see in our DB corresponding rows like the following.
> id | roles
> ---+
>   16867433 | {NULL}
>
> We have confirmed that by not selecting "roles" all data passes through
> without failure on a single operator, but selecting "roles" will eventually
> always fail with java.lang.NullPointerException repeatedly. What is odd
> about this is there is 0 additional stack trace, just the exception, in our
> logs and in Flink UI. We only have INFO logging on, however, other
> exceptions we've encountered in our development have always revealed a
> stack trace.
>
> {
>   "schema": {
> "type": "struct",
> "fields": [
>   {
> "type": "struct",
> "fields": [
>   { "type": "int32", "optional": false, "field": "id" },
>   {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
>   },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "before"
>   },
>   {
> "type": "struct",
> "fields": [
>   { "type": "int32", "optional": false, "field": "id" },
>   {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
>   },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "after"
>   },
>   {
> "type": "struct",
> "fields": [
>   { "type": "string", "optional": false, "field": "version" },
>   { "type": "string", "optional": false, "field": "connector" },
>   { "type": "string", "optional": false, "field": "name" },
>   { "type": "int64", "optional": false, "field": "ts_ms" },
>   {
> "type": "string",
> "optional": true,
> "name": "io.debezium.data.Enum",
> "version": 1,
> "parameters": { "allowed": "true,last,false" },
> "default": "false",
> "field": "snapshot"
>   },
>   { "type": "string", "optional": false, "field": "db" },
>   { "type": "string", "optional": false, "field": "schema" },
>   { "type": "string", "optional": false, "field": "table" },
>   { "type": "int64", "optional": true, "field": "txId" },
>   { "type": "int64", "optional": true, "field": "lsn" },
>   { "type": "int64", "optional": true, "field": "xmin" }
> ],
> "optional": false,
> "name": "io.debezium.connector.postgresql.Source",
> "field": "source"
>   },
>   { "type": "string", "optional": false, "field": "op" },
>   { "type": "int64", "optional": true, "field": "ts_ms" },
>   {
> "type": "struct",
> "fields": [
>   { "type": "string", "optional": false, "field": "id" },
>   { "type": "int64", "optional":

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Rex Fenley
Below is a highly redacted set of data that should represent the problem.
As you can see, the "roles" field has "[null]" in it, a null value within
the array. We also see in our DB corresponding rows like the following.
id | roles
---+
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through
without failure on a single operator, but selecting "roles" will eventually
always fail with java.lang.NullPointerException repeatedly. What is odd
about this is there is 0 additional stack trace, just the exception, in our
logs and in Flink UI. We only have INFO logging on, however, other
exceptions we've encountered in our development have always revealed a
stack trace.

{
  "schema": {
"type": "struct",
"fields": [
  {
"type": "struct",
"fields": [
  { "type": "int32", "optional": false, "field": "id" },
  {
"type": "array",
"items": { "type": "string", "optional": true },
"optional": false,
"field": "roles"
  },
],
"optional": true,
"name": "db.public.data.Value",
"field": "before"
  },
  {
"type": "struct",
"fields": [
  { "type": "int32", "optional": false, "field": "id" },
  {
"type": "array",
"items": { "type": "string", "optional": true },
"optional": false,
"field": "roles"
  },
],
"optional": true,
"name": "db.public.data.Value",
"field": "after"
  },
  {
"type": "struct",
"fields": [
  { "type": "string", "optional": false, "field": "version" },
  { "type": "string", "optional": false, "field": "connector" },
  { "type": "string", "optional": false, "field": "name" },
  { "type": "int64", "optional": false, "field": "ts_ms" },
  {
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": { "allowed": "true,last,false" },
"default": "false",
"field": "snapshot"
  },
  { "type": "string", "optional": false, "field": "db" },
  { "type": "string", "optional": false, "field": "schema" },
  { "type": "string", "optional": false, "field": "table" },
  { "type": "int64", "optional": true, "field": "txId" },
  { "type": "int64", "optional": true, "field": "lsn" },
  { "type": "int64", "optional": true, "field": "xmin" }
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
  },
  { "type": "string", "optional": false, "field": "op" },
  { "type": "int64", "optional": true, "field": "ts_ms" },
      {
    "type": "struct",
"fields": [
  { "type": "string", "optional": false, "field": "id" },
  { "type": "int64", "optional": false, "field": "total_order" },
  {
"type": "int64",
"optional": false,
"field": "data_collection_order"
  }
],
"optional": true,
"field": "transaction"
  }
],
"optional": false,
"name": "db.public.data.Envelope"
  },
  "payload": {
"before": null,
"after": {

Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Dylan Forciea
Ah yes, missed the kafka part and just saw the array part. FLINK-19771 
definitely was solely in the postgres-specific code.

Dylan

From: Jark Wu 
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea 
Cc: Danny Chan , Rex Fenley , Flink 
ML 
Subject: Re: Filter Null in Array in SQL Connector

Hi Dylan,

I think Rex encountered another issue, because he is using Kafka with Debezium 
format.

Hi Rex,

If you can share the json data and the exception stack, that would be helpful!

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] 
to skip the dirty data.

Best,
Jark

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea 
mailto:dy...@oseberg.io>> wrote:
Do you mean that the array contains values that are null, or that the entire 
array itself is null? If it’s the latter, I have an issue written, along with a 
PR to fix it that has been pending review [1].

Regards,
Dylan Forciea

[1] https://issues.apache.org/jira/browse/FLINK-19771

From: Danny Chan mailto:danny0...@apache.org>>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley mailto:r...@remind101.com>>
Cc: Flink ML mailto:user@flink.apache.org>>
Subject: Re: Filter Null in Array in SQL Connector

Hi, Fenley ~

You are right, parsing nulls of ARRAY field is not supported now, i have logged 
an issue [1] and would fix it soon ~

[1] https://issues.apache.org/jira/browse/FLINK-20234

Rex Fenley mailto:r...@remind101.com>> 于2020年11月19日周四 
下午2:51写道:
Hi,

I recently discovered some of our data has NULL values arriving in an 
ARRAY column. This column is being consumed by Flink via the Kafka 
connector Debezium format. We seem to be receiving NullPointerExceptions for 
when these NULL values in the arrays arrive which restarts the source operator 
in a loop.

Is there any way to not throw or to possibly filter out NULLs in an Array of 
Strings in Flink?

We're somewhat stuck on how to solve this problem, we'd like to be defensive 
about this on Flink's side.

Thanks!

(P.S. The exception was not that informative, there may be room for improvement 
in terms of a richer error message when this happens.)

--

Rex Fenley  |  Software Engineer - Mobile and Backend



Remind.com<https://www.remind.com/> |  BLOG<http://blog.remind.com/>  |  FOLLOW 
US<https://twitter.com/remindhq>  |  LIKE US<https://www.facebook.com/remindhq>


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Jark Wu
Hi Dylan,

I think Rex encountered another issue, because he is using Kafka with
Debezium format.

Hi Rex,

If you can share the json data and the exception stack, that would be
helpful!

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option
[1] to skip the dirty data.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea  wrote:

> Do you mean that the array contains values that are null, or that the
> entire array itself is null? If it’s the latter, I have an issue written,
> along with a PR to fix it that has been pending review [1].
>
>
>
> Regards,
>
> Dylan Forciea
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-19771
>
>
>
> *From: *Danny Chan 
> *Date: *Thursday, November 19, 2020 at 2:24 AM
> *To: *Rex Fenley 
> *Cc: *Flink ML 
> *Subject: *Re: Filter Null in Array in SQL Connector
>
>
>
> Hi, Fenley ~
>
>
>
> You are right, parsing nulls of ARRAY field is not supported now, i have
> logged an issue [1] and would fix it soon ~
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-20234
>
>
>
> Rex Fenley  于2020年11月19日周四 下午2:51写道:
>
> Hi,
>
>
>
> I recently discovered some of our data has NULL values arriving in an
> ARRAY column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions
> for when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
>
>
>
> Is there any way to not throw or to possibly filter out NULLs in an Array
> of Strings in Flink?
>
>
>
> We're somewhat stuck on how to solve this problem, we'd like to be
> defensive about this on Flink's side.
>
>
>
> Thanks!
>
>
>
> (P.S. The exception was not that informative, there may be room for
> improvement in terms of a richer error message when this happens.)
>
>
> --
>
> *Rex Fenley*  |  Software Engineer - Mobile and Backend
>
>
>
> *Remind.com* <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>  |  FOLLOW US <https://twitter.com/remindhq>  |  *LIKE US
> <https://www.facebook.com/remindhq>*
>
>


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Dylan Forciea
Do you mean that the array contains values that are null, or that the entire 
array itself is null? If it’s the latter, I have an issue written, along with a 
PR to fix it that has been pending review [1].

Regards,
Dylan Forciea

[1] https://issues.apache.org/jira/browse/FLINK-19771

From: Danny Chan 
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley 
Cc: Flink ML 
Subject: Re: Filter Null in Array in SQL Connector

Hi, Fenley ~

You are right, parsing nulls of ARRAY field is not supported now, i have logged 
an issue [1] and would fix it soon ~

[1] https://issues.apache.org/jira/browse/FLINK-20234

Rex Fenley mailto:r...@remind101.com>> 于2020年11月19日周四 
下午2:51写道:
Hi,

I recently discovered some of our data has NULL values arriving in an 
ARRAY column. This column is being consumed by Flink via the Kafka 
connector Debezium format. We seem to be receiving NullPointerExceptions for 
when these NULL values in the arrays arrive which restarts the source operator 
in a loop.

Is there any way to not throw or to possibly filter out NULLs in an Array of 
Strings in Flink?

We're somewhat stuck on how to solve this problem, we'd like to be defensive 
about this on Flink's side.

Thanks!

(P.S. The exception was not that informative, there may be room for improvement 
in terms of a richer error message when this happens.)

--

Rex Fenley  |  Software Engineer - Mobile and Backend



Remind.com<https://www.remind.com/> |  BLOG<http://blog.remind.com/>  |  FOLLOW 
US<https://twitter.com/remindhq>  |  LIKE US<https://www.facebook.com/remindhq>


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
Can you also share your problematic json string here ? So that we can
decide the specific error case cause.

Rex Fenley  于2020年11月19日周四 下午2:51写道:

> Hi,
>
> I recently discovered some of our data has NULL values arriving in an
> ARRAY column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions
> for when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
>
> Is there any way to not throw or to possibly filter out NULLs in an Array
> of Strings in Flink?
>
> We're somewhat stuck on how to solve this problem, we'd like to be
> defensive about this on Flink's side.
>
> Thanks!
>
> (P.S. The exception was not that informative, there may be room for
> improvement in terms of a richer error message when this happens.)
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: Filter Null in Array in SQL Connector

2020-11-19 Thread Danny Chan
Hi, Fenley ~

You are right, parsing nulls of ARRAY field is not supported now, i have
logged an issue [1] and would fix it soon ~

[1] https://issues.apache.org/jira/browse/FLINK-20234

Rex Fenley  于2020年11月19日周四 下午2:51写道:

> Hi,
>
> I recently discovered some of our data has NULL values arriving in an
> ARRAY column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions
> for when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
>
> Is there any way to not throw or to possibly filter out NULLs in an Array
> of Strings in Flink?
>
> We're somewhat stuck on how to solve this problem, we'd like to be
> defensive about this on Flink's side.
>
> Thanks!
>
> (P.S. The exception was not that informative, there may be room for
> improvement in terms of a richer error message when this happens.)
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>